您正在multiprocessing跨多个进程运行一些代码,而这些代码只是坐在那里。卡住了

您检查CPU使用率-什么都没有发生,它没有做任何工作。

这是怎么回事?

在许多情况下,您可以使用一行代码来解决此问题(跳到最后进行尝试),但是首先,是时候深入研究Python的缺陷和POSIX系统编程的痛苦了,使用令人兴奋的方法,而不是很简单。令人信服的鲨鱼主题隐喻!

让我们设置一个比喻的场景:您在充满鲨鱼的游泳池里游泳。(鲨鱼是流程的隐喻。)

接下来,您拿叉子。(叉子是的隐喻fork()。)

您用叉子刺了一下自己。刺刺刺。鲜血开始渗出,鲨鱼开始盘旋,很快你就会发现自己死了-死在水中!

在穿越时空的旅程中,您将遇到:

  • 神秘的失败,其中Python multiprocessing.Pool陷入了僵局。
  • 神秘的根源:fork()
  • 一个难题,其中fork()复制所有内容都是问题,而fork()不是复制所有内容也是一个问题。
  • 一些创可贴不能止血。
  • 该解决方案可以防止您的代码被鲨鱼吃掉。

让我们开始!

简介 multiprocessing.Pool

Python提供了一个方便的模块,允许您在进程池中运行任务,这是改善程序并行性的好方法。(请注意,这些示例均未在Windows上进行过测试;我在这里重点关注* nix平台。)

from multiprocessing import Pool
from os import getpid

def double(i):
    print("I'm process", getpid())
    return i * 2

if __name__ == '__main__':
    with Pool() as pool:
        result = pool.map(double, [1, 2, 3, 4, 5])
        print(result) 
1
2
3
4
5
6
7
8
9
10
11

如果运行此命令,则会得到:

I'm process 4942
I'm process 4943
I'm process 4944
I'm process 4942
I'm process 4943
[2, 4, 6, 8, 10] 
1
2
3
4
5
6

如您所见,该double()函数在不同的进程中运行。

一些代码应该起作用,但是不起作用

不幸的是,尽管该Pool课程很有用,但也充满了恶毒的鲨鱼,只等您犯了一个错误。例如,以下完全合理的代码:

import logging
from threading import Thread
from queue import Queue
from logging.handlers import QueueListener, QueueHandler
from multiprocessing import Pool

def setup_logging():
    # Logs get written to a queue, and then a thread reads
    # from that queue and writes messages to a file:
    _log_queue = Queue()
    QueueListener(
        _log_queue, logging.FileHandler("out.log")).start()
    logging.getLogger().addHandler(QueueHandler(_log_queue))

    # Our parent process is running a thread that
    # logs messages:
    def write_logs():
        while True:
            logging.error("hello, I just did something")
    Thread(target=write_logs).start()

def runs_in_subprocess():
    print("About to log...")
    logging.error("hello, I did something")
    print("...logged")

if __name__ == '__main__':
    setup_logging()

    # Meanwhile, we start a process pool that writes some
    # logs. We do this in a loop to make race condition more
    # likely to be triggered.
    while True:
        with Pool() as pool:
            pool.apply(runs_in_subprocess) 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

该程序的作用如下:

  1. 在父进程中,日志消息被路由到队列,并且线程从队列中读取并将这些消息写入日志文件。
  2. 另一个线程编写连续的日志消息流。
  3. 最后,我们启动一个进程池,并在一个子子进程中记录一条消息。

如果我们在Linux上运行该程序,则会得到以下输出:

About to log...
...logged
About to log...
...logged
About to log...
<at this point the program freezes> 
1
2
3
4
5
6

为什么该程序冻结?

如何在POSIX(以前称为Unix的标准)上启动子流程

要了解正在发生的事情,您需要了解如何在POSIX(即Linux,BSD,macOS等)上启动子流程。

  1. 使用fork()系统调用创建该过程的副本。
  2. 子进程使用execve()系统调用(或其变体之一,例如execl())将自己替换为其他程序。

关键是,没有什么可以阻止您仅做某事fork()。例如,在这里我们fork()然后打印当前进程的进程ID(PID):

from os import fork, getpid

print("I am parent process", getpid())
if fork():
    print("I am the parent process, with PID", getpid())
else:
    print("I am the child process, with PID", getpid()) 
1
2
3
4
5
6
7

当我们运行它时:

I am parent process 3619
I am the parent process, with PID 3619
I am the child process, with PID 3620 
1
2
3

如您所见,父级(PID 3619)和子级(PID 3620)都继续运行相同的Python代码。

这就是有趣的地方:- fork()仅是Python在Linux上默认创建进程池的方式,而在Python 3.7和更早版本的macOS上创建进程池的方式。

只有这个问题fork()ING

好的,Python只需执行即可启动一个进程池fork()。这似乎很方便:子进程可以访问父进程内存中所有内容的副本(尽管子进程无法再_更改_父进程中的任何内容)。但是,这到底是什么造成了我们看到的僵局?

原因是在fork()-without- 之后继续运行代码的两个问题execve()

  1. fork() 复制内存中的所有内容。
  2. 但这并不能复制_所有内容_。

fork() 复制内存中的所有内容

执行时fork(),它将复制内存中的所有内容。这包括您在导入的Python模块中设置的所有全局变量。

例如,您的logging配置:

import logging
from multiprocessing import Pool
from os import getpid

def runs_in_subprocess():
    logging.info(
        "I am the child, with PID {}".format(getpid()))

if __name__ == '__main__':
    logging.basicConfig(
        format='GADZOOKS %(message)s', level=logging.DEBUG)

    logging.info(
        "I am the parent, with PID {}".format(getpid()))

    with Pool() as pool:
        pool.apply(runs_in_subprocess) 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

当我们运行该程序时,我们得到:

GADZOOKS I am the parent, with PID 3884
GADZOOKS I am the child, with PID 3885 
1
2

注意池中的子进程如何继承父进程的日志记录配置,即使这不是您的意图!更广泛地讲,您在父级中在模块级别配置的_任何内容_都将被池中的进程继承,这可能导致某些意外行为。

fork()不会复制所有内容

第二个问题是fork()实际上并没有复制所有内容。特别是,fork() _没有_复制的一件事是线程。在父进程中运行的任何线程在子进程中都不存在。

from threading import Thread, enumerate
from os import fork
from time import sleep

# Start a thread: Thread(target=lambda: sleep(60)).start()

if fork():
    print("The parent process has {} threads".format(
        len(enumerate())))
else:
    print("The child process has {} threads".format(
        len(enumerate()))) 
1
2
3
4
5
6
7
8
9
10
11
12

当我们运行该程序时,我们看到启动的线程无法幸存fork()

The parent process has 2 threads
The child process has 1 threads 
1
2

谜团解决了

这就是原始程序陷入僵局的原因-结合它们的力量,这两个问题fork()-只会产生一个更大,更严重的问题:

  1. 只要父进程中的线程写入日志消息,就会将其添加到中Queue。这涉及获取锁。
  2. 如果fork()在错误的时间发生,则将锁定以获取状态复制。
  3. 子进程复制父进程的日志记录配置(包括队列)。
  4. 每当子进程写入日志消息时,它将尝试将其写入队列。
  5. 这意味着要获取锁,但是已经获取了锁。
  6. 现在,子进程等待释放锁。
  7. 永远不会释放该锁,因为不会释放该锁的线程fork()

简化形式:

from os import fork
from threading import Lock

# Lock is acquired in the parent process: lock = Lock()
lock.acquire()

if fork() == 0:
    # In the child process, try to grab the lock:
    print("Acquiring lock...")
    lock.acquire()
    print("Lock acquired! (This code will never run)") 
1
2
3
4
5
6
7
8
9
10
11

创可贴和解决方法

有一些变通办法可以使它更好一些。

对于模块状态,logging当子进程由启动时,库可以重置其配置multiprocessing.Pool。但是,这不能解决所有_其他_设置某种模块级别全局状态的_其他_ Python模块和库的问题。每个这样做的库都需要修复才能与一起使用multiprocessing

对于线程,可以在fork()调用时将锁设置回释放状态(Python对此提供了票证。)不幸的是,这不能解决C库创建的锁的问题,它只能解决Python直接创建的锁。它并没有解决这样的事实,即无论这些锁是否已释放,这些锁在子进程中实际上都不再有意义。

幸运的是,有一个更好,更轻松的解决方案。

真正的解决方案:停止fork()抱怨

在Python 3中,该multiprocessing库添加了启动子流程的新方法。其中一个做了fork()之后是execve()一个完全新的Python程序。这就解决了我们的问题,因为模块状态不被子进程继承:它从头开始。

启用此备用配置需要在程序的任何其他导入或使用之前只更改程序中的两行代码multiprocessing。基本上,您的应用程序要做的第一件事应该是:

from multiprocessing import set_start_method
set_start_method("spawn") 
1
2

这会全局更改程序中所有代码的内容,因此,如果要维护库,那么有礼貌的事情是仅对自己的池使用“ spawn”方法,如下所示:

from multiprocessing import get_context

def your_func():
    with get_context("spawn").Pool() as pool:
        # ... everything else is unchanged 
1
2
3
4
5

就是这样:这样做,我们已经解决的所有问题都不会影响您。(有关详细信息,请参见上下文文档。)

但这仍然需要_您_去做。并且它要求每个信任地遵循文档中示例的Python用户都要弄清楚为什么他们的程序有时会出错。

当前的默认设置已被破坏,在理想情况下,Python会对此进行记录,或者最好将其更改为不再是默认设置。

了解更多

我在这里的解释当然有所简化:例如,除了线程之外,还有其他状态fork()无法复制。以下是一些其他资源:

保持安全,并注意鲨鱼和线程与进程之间的不良交互!🦈🦑

感谢Terry Reedy指出了需求if __name__ == '__main__'