开发手册 欢迎您!
软件开发者资料库

Python multiprocessing 多进程间通信传递DataFrame的方法

进程是资源的集合,是最小的资源单位。是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。多进程适合执行计算密集型任务(如:视频编码解码、数据处理、科学计算等)、可以分解为多个并行子任务并能合并子任务执行结果的任务,以及在内存使用方面没有任何限制且不强依赖于I/O操作的任务。本文主要介绍Python 中 multiprocessing 多进程间通信传递值方法,以及相关的示例代码。

1、Python 多进程

参考文档:Python 异步编程 多进程

2、使用multiprocessing.Manager的Namespace()实现

可以使用multiprocessing.Manager为所有进程提供单例DataFrame实例。有几种不同的方法可以达到相同的效果,但可能最简单的方法是将DataFrame放入multiprocessing.Manager实例的Namespace中。

from multiprocessing import Manager,Processimport pandas as pdimport timedf = pd.DataFrame([[10,6,7,8],[1,9,12,14],[5,8,10,6]],columns = ['a','b','c','d'])mgr = Manager()ns = mgr.Namespace()ns.df = dfdef worker(ns):print(ns.df)time.sleep(1)print("end")#另一个进程p = Process(target=worker, args=(ns,))p.start()p.join()

3、使用BaseManager和SyncManager实现

使用Python的BaseManagerSyncManager类使用客户端/服务器设置。首先设置一个服务器,为数据提供代理类。代码如下,

1)DataServer.py

#!/usr/bin/pythonfrom    multiprocessing.managers import SyncManagerimport  numpyimport pandas as pd# Global for storing the data to be servedgData = {}#不同进程共享的代理类#不要把大数据放在这里,因为那会迫使它被管道传输到# other进程在那里实例化时,而是只返回一部分当请求时,#全局数据。class DataProxy(object):    def __init__(self):        pass    def getData(self, key, default=None):        global gData        return gData.get(key, None)if __name__ == '__main__':    port  = 5000    gData[1] = pd.DataFrame([[10,6,7,8],[1,9,12,14],[5,8,10,6]],columns = ['a','b','c','d'])    # Start the server on address(host,port)    print('Serving data. Press -c to stop.')    class myManager(SyncManager): pass    myManager.register('DataProxy', DataProxy)    mgr = myManager(address=('', port), authkey='DataProxy01'.encode())    server = mgr.get_server()    server.serve_forever()

2)DataClient.py

from   multiprocessing.managers import BaseManagerimport psutil   #用于获取进程信息# 获取共享代理类。该类中的所有方法都在这里可用class DataClient(object):    def __init__(self, port):        #assert DataClient._checkForProcess('DataServer.py'), 'Must have DataServer running'        class myManager(BaseManager): pass        myManager.register('DataProxy')        self.mgr = myManager(address=('localhost', port), authkey='DataProxy01'.encode())        self.mgr.connect()        self.proxy = self.mgr.DataProxy()    # 验证服务器正在运行 (非必须的)    @staticmethod    def _checkForProcess(name):        for proc in psutil.process_iter():            print(proc.name())            if proc.name() == name:                                return True        return False

3)使用示例

先运行DataServer.py,然后运行保存的下面代码,如下,

#!/usr/bin/pythonimport timeimport multiprocessing as mpimport numpyfrom   DataClient import *    # “代理”对每个子进程都是全局的,# 不是在所有进程之间共享的gProxy = NonegMode  = NonegDummy = Nonedef init(port, mode):    global gProxy, gMode, gDummy    gProxy  = DataClient(port).proxy    gMode  = mode    gDummy = numpy.random.rand(1000)     print('Init proxy ', id(gProxy), 'in ', mp.current_process())def worker(key):    global gProxy, gMode, gDummy    if 0 == gMode:   # 从代理获取        array = gProxy.getData(key)        print(array)    elif 1 == gMode: # 测试区别        array = gDummy    else: assert 0, 'unknown mode: %s' % gMode if __name__ == '__main__':    port   = 5000    maxkey = 1000    numpts = 100                       for mode in [1, 0]:        for nprocs in [16, 1]:            if 0==mode: print('使用 client/server %d processes' % nprocs)            if 1==mode: print('使用 local data %d processes' % nprocs)            pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))            start = time.time()            ret_data = pool.map(worker,[1],chunksize=1)            print('took %4.3f seconds' % (time.time()-start))            pool.close()