Menu

6.10. 执行效率

file

CPython 作为最流行的 Python 环境,对于 CPU 密集型任务较慢,而 PyPy 则较快。

使用稍作改动的 David Beazley 的 CPU 密集测试代码(增加了多重循环进行多轮测试), 可以看到 CPython 与 PyPy 之间的执行差距。

# PyPy
$ ./pypy -V
Python 2.7.1 (7773f8fc4223, Nov 18 2011, 18:47:10)
[PyPy 1.7.0 with GCC 4.4.3]
$ ./pypy measure2.py
0.0683999061584
0.0483210086823
0.0388588905334
0.0440690517426
0.0695300102234
# CPython
$ ./python -V
Python 2.7.1
$ ./python measure2.py
1.06774401665
1.45412397385
1.51485204697
1.54693889618
1.60109114647

运行环境

The GIL

GIL  (Global interpreter lock 全局解释器锁) 是 Python 支持多线程并行操作的方式。 Python 的内存管理不是完全线程安全的,所以 GIL 被创造出来避免多线程同时运行同一个 Python 代码。

David Beazley 有一个关于 GIL 如何工作的 指南 。他也讨论了 Python3.2 中的 新 GIL  他的结论是为了最大化一个 Python 程序的性能,应该对 GIL 工作方式有一个深刻的理解 —— 它如何影响您的特定程序,您拥有多少核,以及您程序瓶颈在哪。

C 扩展

当写一个C扩展时必须 特别关注 在解释器中注册您的线程。

GIL 全局锁

当写一个 C 扩展时必须 重点注意 在解释器中注册你的线程。

C 扩展

Cython

Cython  是 Python 语言的一个超集,它允许你为 Python 写 C 或 C++ 模块。Cython 也使得你能够从已编译的 C 库中调用函数。使用 Cython 让您得以发挥 Python 的变量与操作的强类型优势。

下面是一个 Cython 中的强类型例子:

def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""

    cdef int n, k, i
    cdef int p[1000]
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

请尝试着将这个『附加关键字寻找素数算法』的 Cython 实现与下面这个纯 Python 实现比较:

def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""

    p = range(1000)
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

注意在 Cython 版本里,创建一个 Python 列表时,声明了会被编译为 C 类型的整型和整型数组:

def primes(int kmax):
    """Calculation of prime numbers with additional
    Cython keywords"""

    cdef int n, k, i
    cdef int p[1000]
    result = []
def primes(kmax):
    """Calculation of prime numbers in standard Python syntax"""

    p = range(1000)
    result = []

它们之间有什么差别呢?在上面的 Cython 版本中,您可以看到变量类型与整型数组像标准 C 一样被声明。 在上面的例子中,第三行的 cdef int n,k,i 这个附加类型声明(整型)使得 Cython 编译器得以产生比 第二个版本更有效率的 C 代码。标准 Python 代码以 .py 格式保存,而 Cython 以 .pyx 格式保存。

速度上有什么差异呢?看看这个:

import time
#activate pyx compiler
import pyximport
pyximport.install()
#primes implemented with Cython
import primesCy
#primes implemented with Python
import primes

print "Cython:"
t1= time.time()
print primesCy.primes(500)
t2= time.time()
print "Cython time: %s" %(t2-t1)
print ""
print "Python"
t1= time.time()
print primes.primes(500)
t2= time.time()
print "Python time: %s" %(t2-t1)

这两行代码需要一些说明:

import pyximport
pyximport.install()

pyximport 使得你可以导入 .pyx 文件,(像 primesCy.pyx 这样的)。 pyximport.install() 命令使 Python 解释器可以打开 Cython 编译器直接编译出 .so 格式的 C 库。Cython之后可以导入这个库到您的 Python 代码中,简便而有效。使用 time.time() 函数您可以比较两个不同的在查找 500 个素数的调用长的时间消耗差异。在一个标准笔记本中 (双核AMD E-450 1.6GHz),测量值是这样的:

Cython time: 0.0054 seconds

Python time: 0.0566 seconds

而这个是嵌入的 ARM beaglebone 机的输出结果:

Cython time: 0.0196 seconds

Python time: 0.3302 seconds

Pyrex

Shedskin?

并行运算

Concurrent.futures

concurrent.futures 模块是 Python 标准库中的一个模块,它提供了一个 『用于异步调用的高级接口』。 它抽象和简化了许多使用多个线程或进程并发的复杂细节,并允许你更专注于完成手头的任务。

concurrent.futures 模块提供了两个主要的类,即 ThreadPoolExecutor 和 ProcessPoolExecutor 。 ThreadPoolExecutor 将创建一个用户可以提交作业的工作线程池。当下一个工作线程可用时, 这些作业将在另一个线程中执行。

ProcessPoolExecutor 以相同的方式工作,它使用多进程而不是多线程作为工作池。这就可以避开 GIL 的问题,但是由于传递参数给工作进程的机制限制,只有可被 Pickle 序列化的对象才能够被执行并返回。

由于 GIL 的工作原理,一个较好的经验法则是当执行涉及很多阻塞(如通过网络发出请求)的任务时,使用 ThreadPoolExecutor ,而对高计算开销的任务使用 ProcessPoolExecutor 执行器。

使用两个执行器并行执行有两个主要方法。一个是使用 map(func, iterables) 方法。 这个函数除了能并行执行一切,它几乎和内置的 map() 函数一模一样 :

from concurrent.futures import ThreadPoolExecutor
import requests

def get_webpage(url):
    page = requests.get(url)
    return page

pool = ThreadPoolExecutor(max_workers=5)

my_urls = ['http://google.com/']*10  # 创建 url 列表 list 

for page in pool.map(get_webpage, my_urls):
    # 处理运行结果
    print(page.text)

为了进一步的控制,submit(func, *args, **kwargs) 方法将调度一个可执行的调用 (如 func(*args, **kwargs) ),并返回一个代表可调用的 Future 执行对象。

Future 对象提供了可用于检查计划可调用进程的各种方法。这些包括:

cancel()

尝试取消调用。

cancelled()

如果调用被成功取消,返回 True。

running()

如果当前正在执行调用而且没被取消,则返回 True。

done()

如果调用被成功取消或完成运行,返回 True。

result()

返回调用返回的值。请注意,此调用将阻塞以至可调用对象有结果返回。

exception()

返回调用抛出的异常。如果没有抛出异常,将返回 None。请注意,这和 result() 一样会阻塞。

add_done_callback(fn)

添加回调函数,并在所调用的可调用对象返回结果时执行(如 fn(future) )。

from concurrent.futures import ProcessPoolExecutor, as_completed

def is_prime(n):
    if n % 2 == 0:
        return n, False

    sqrt_n = int(n**0.5)
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return n, False
    return n, True

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

futures = []
with ProcessPoolExecutor(max_workers=4) as pool:
    # Schedule the ProcessPoolExecutor to check if a number is prime
    # and add the returned Future to our list of futures
    for p in PRIMES:
        fut = pool.submit(is_prime, p)
        futures.append(fut)

# As the jobs are completed, print out the results
for number, result in as_completed(futures):
    if result:
        print("{} is prime".format(number))
    else:
        print("{} is not prime".format(number))

concurrent.futures 模块包含两个帮助函数来处理 Futures。as_completed(futures) 函数 返回 futures 列表的的迭代器,在 futures 结束时 yield

而 wait(futures) 函数则简单地阻塞,直到列表中所有的 futures 完成。

有关使用 concurrent.futures 模块的更多信息,请参阅官方文档。

Threading

标准库带有一个 threading 模块,允许用户手动处理多个线程。

在另一个线程中运行一个函数是一件非常简单的事情,传参一个可执行对象到 Thread 的构造函数中, 然后调用 start() 即可:

from threading import Thread
import requests

def get_webpage(url):
    page = requests.get(url)
    return page

some_thread = Thread(get_webpage, 'http://google.com/')
some_thread.start()

调用 join() 来等待线程终止:

some_thread.join()

调用 join() 后,建议要检查下线程是否仍然存活(因为 join 调用超时):

if some_thread.is_alive():
    print("join() must have timed out.")
else:
    print("Our thread has terminated.")

由于多个线程可以访问相同的内存部分,有时可能会出现两个或多个线程尝试同时写入同一资源的情况, 或者输出取决于某些事件的顺序或时序。 这被称为 竞争条件 。当这种情况发生时, 输出将会出现乱码,或者可能会遇到难以调试的问题。 stackoverflow 上的一个文章 是个很好的例子。

一个比较好的解决方法是在每个线程写入共享资源之前获取 Lock 。 锁可以通过环境上下文协议 ( with 语句)或直接使用 acquire() 和 release() 来获取和释放。 以下是一个(颇有争议的)例子:

from threading import Lock, Thread

file_lock = Lock()

def log(msg):
    with file_lock:
        open('website_changes.log', 'w') as f:
            f.write(changes)

def monitor_website(some_website):
    """
        监控网站,如果发生变化就写 log 到硬盘上
    """
    while True:
        changes = check_for_changes(some_website)
        if changes:
            log(changes)

websites = ['http://google.com/', ... ]
for website in websites:
    t = Thread(monitor_website, website)
    t.start()

在这里,我们有一堆线程检查站点列表中的更改,每当有任何更改时,它们尝试通过调用 log(changes) 将这些更改写入文件。 当调用 log() 时,它在 with file_lock: 处等待获取锁。 这样可以确保在任何时候只有一个线程正在写入文件。

本文章首发在 PythonCaff
上一篇 下一篇
讨论数量: 0
发起讨论


暂无话题~