假设我们必须为多线程任务创建大量线程.由于线程太多,因此可能存在许多性能问题,这在计算上是最昂贵的.一个主要问题可能是吞吐量受限.我们可以通过创建一个线程池来解决这个问题.线程池可以被定义为预先实例化和空闲线程的组,其准备好被给予工作.当我们需要执行大量任务时,创建线程池优先于为每个任务实例化新线程.线程池可以管理大量线程的并发执行,如下所示;
如果线程池中的线程完成它的执行然后该线程可以被重用.
如果一个线程被终止,将创建另一个线程来替换该线程.
Python模块 - Concurrent.futures
Python标准库包含 concurrent.futures 模块.该模块在Python 3.2中添加,为开发人员提供了用于启动异步任务的高级接口.它是Python的线程和多处理模块之上的抽象层,用于提供使用线程池或进程池运行任务的接口.
在接下来的部分中,我们将了解concurrent.futures模块的不同类.
执行者类
执行者是的抽象类concurrent.futures Python模块.它不能直接使用,我们需要使用以下具体子类之一 :
ThreadPoolExecutor
ProcessPoolExecutor
ThreadPoolExecutor - 具体子类
它是Executor类的具体子类之一.子类使用多线程,我们获得了一个用于提交任务的线程池.此池将任务分配给可用线程并安排它们运行.
如何创建ThreadPoolExecutor?
在并发的帮助下.futures 模块及其具体子类 Executor ,我们可以轻松创建一个线程池.为此,我们需要构造一个 ThreadPoolExecutor ,其中包含我们在池中想要的线程数.默认情况下,该数字为5.然后我们可以向线程池提交任务.当我们提交()任务时,我们会返回 Future . Future对象有一个名为 done()的方法,它告诉我们未来是否已经解决.有了这个,就为该特定的未来对象设置了一个值.当任务完成时,线程池执行器将值设置为future对象.
示例
from concurrent.futures import ThreadPoolExecutorfrom time import sleepdef task(message): sleep(2) return messagedef main(): executor = ThreadPoolExecutor(5) future = executor.submit(task, ("Completed")) print(future.done()) sleep(2) print(future.done()) print(future.result())if __name__ == '__main__':main()
输出
FalseTrueCompleted
在上面的例子中, ThreadPoolExecutor 构造了5个线程.然后,在给出消息之前等待2秒的任务被提交给线程池执行器.从输出中可以看出,任务直到2秒才完成,因此第一次调用 done()将返回False. 2秒后,任务完成,我们通过调用 result()方法得到未来的结果.
实例化ThreadPoolExecutor - Context Manager
实例化 ThreadPoolExecutor 的另一种方法是在上下文管理器的帮助下.它的工作方式类似于上例中使用的方法.使用上下文管理器的主要优点是它在语法上看起来很好.实例化可以在以下代码的帮助下完成;
,ThreadPoolExecutor(max_workers = 5)作为执行者
示例
以下示例是从Python文档中借用的.在此示例中,首先必须导入 concurrent.futures 模块.然后创建一个名为 load_url()的函数,它将加载请求的URL.然后,该函数使用池中的5个线程创建 ThreadPoolExecutor
. ThreadPoolExecutor 已被用作上下文管理器.我们可以通过调用 result()方法获得未来的结果.
import concurrent.futuresimport urllib.requestURLS = ['https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/', 'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/']def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
输出
以下是上述Python脚本的输出 :
'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' generated an exception:'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 229313 bytes'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 168933 bytes'http://www.bbc.co.uk/' page is 283893 bytes'http://europe.wsj.com/' page is 938109 bytes
使用Executor. map()函数
Python map()函数广泛用于许多任务中.一个这样的任务是将特定函数应用于迭代中的每个元素.类似地,我们可以将迭代器的所有元素映射到函数,并将它们作为独立的作业提交给 ThreadPoolExecutor .请考虑以下Python脚本示例,以了解该函数的工作原理.
示例
在下面的示例中,map函数用于应用 square()函数到values数组中的每个值.
from concurrent.futures import ThreadPoolExecutorfrom concurrent.futures import as_completedvalues = [2,3,4,5]def square(n): return n * ndef main(): with ThreadPoolExecutor(max_workers = 3) as executor: results = executor.map(square, values)for result in results: print(result)if __name__ == '__main__': main()
输出
上述Python脚本生成以下输出 :
4 9 16 25