簡述
PEP492引入了對Python 3.5的原生協(xié)程和async/await句法的支持。本次提案添加了對異步生成器的支持進而來擴展Python的異步功能。
理論和目標
常規(guī)生成器(在PEP 255中引入)的實現(xiàn),使得編寫復雜數(shù)據(jù)變得更優(yōu)雅,它們的行為類似于迭代器。
當時沒有提供async for使用的異步生成器。 編寫異步數(shù)據(jù)生成器變得非常復雜,因為必須定義一個實現(xiàn)__aiter__和__anext__的方法,才能在async for語句中使用它。
為了說明異步生成器的重要性,專門做了性能測試,測試結果表明使用異步生成器要比使用異步迭代器快2倍多。
下面的代碼是演示了在迭代的過程中等待幾秒
class Ticker:
'''Yield numbers from 0 to `to` every `delay` seconds.'''
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
我們那可以使用下面的代碼實現(xiàn)同樣的功能:
async def ticker(delay, to):
'''Yield numbers from 0 to `to` every `delay` seconds.'''
for i in range(to):
yield i
await asyncio.sleep(delay)
詳細說明
異步生成器
我們直到在函數(shù)中使用一個或多個yield該函數(shù)將變成一個生成器。
def func(): # 方法
return
def genfunc(): # 生成器方法
yield
我們提議使用類似的功能實現(xiàn)下面異步生成器:
async def coro(): # 一個協(xié)程方法
await smth()
async def asyncgen(): # 一個異步生成器方法
await smth()
yield 42
調用異步生成器函數(shù)的結果是異步生成器對象,它實現(xiàn)了PEP 492中定義的異步迭代協(xié)議。
注意:在異步生成器中使用非空return語句會引發(fā)SyntaxError錯誤。
對異步迭代協(xié)議的支持
該協(xié)議需要實現(xiàn)兩種特殊方法:
__aiter__方法返回一個異步迭代器。
__anext__方法返回一個awaitable對象,它使用StopIteration異常來捕獲yield的值,使用StopAsyncIteration異常來表示迭代結束。
異步生成器定義了這兩種方法。 讓我們實現(xiàn)一個一個簡單的異步生成器:
import asyncio
async def genfunc():
yield 1
yield 2
gen = genfunc()
async def start():
assert gen.__aiter__() is gen
assert await gen.__anext__() == 1
assert await gen.__anext__() == 2
await gen.__anext__() # This line will raise StopAsyncIteration.
if __name__ == '__main__':
asyncio.run(start())
終止
PEP 492提到需要使用事件循環(huán)或調度程序來運行協(xié)程。 因為異步生成器是在協(xié)程使用的,所以還需要創(chuàng)建一個事件循環(huán)來運行。
異步生成器可以有try..finally塊,也可以用async with異步上下文管理代碼快。 重要的是提供一種保證,即使在部分迭代時,也可以進行垃圾收集,生成器可以安全終止。
async def square_series(con, to):
async with con.transaction():
cursor = con.cursor(
'SELECT generate_series(0, $1) AS i', to)
async for row in cursor:
yield row['i'] ** 2
async for i in square_series(con, 1000):
if i == 100:
break
上面代碼演示了異步生成器在async with中使用,然后使用async for對異步生成器對象進行迭代處理,同時我們也可以設置一個中斷條件。
square_series()生成器將被垃圾收集,并沒有異步關閉生成器的機制,Python解釋器將無法執(zhí)行任何操作。
為了解決這個問題,這里提出以下改進建議:
1.在異步生成器上實現(xiàn)一個aclose方法,返回一個特殊awaittable 對象。 當awaitable拋出GeneratorExit異常的時候,拋出到掛起的生成器中并對其進行迭代,直到發(fā)生GeneratorExit或StopAsyncIteration。這就是在常規(guī)函數(shù)中使用close方法關閉對象一樣,只不過aclose需要一個事件循環(huán)去執(zhí)行。
2.不要在異步生成器中使用yield語句,只能用await。
3.在sys模塊中加兩個方法:set_asyncgen_hooks() and get_asyncgen_hooks().
sys.set_asyncgen_hooks()背后的思想是允許事件循環(huán)攔截異步生成器的迭代和終結,這樣最終用戶就不需要關心終結問題了,一切正常。
sys.set_asyncgen_hooks() 可以結束兩個參數(shù)
firstiter:一個可調用的,當?shù)谝淮蔚惒缴善鲿r將調用它。
finalizer:一個可調用的,當異步生成器即將被GC時將被調用。
當?shù)谝坏惒缴善鲿r,它會引用到當前的finalizer。
當異步生成器即將被垃圾收集時,它會調用其緩存的finalizer。假想在事件循環(huán)激活異步生成器開始迭代的時候,finalizer將調用一個aclose()方法.
例如,以下是如何修改asyncio以允許安全地完成異步生成器:
# asyncio/base_events.py
class BaseEventLoop:
def run_forever(self):
...
old_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
try:
...
finally:
sys.set_asyncgen_hooks(*old_hooks)
...
def _finalize_asyncgen(self, gen):
self.create_task(gen.aclose())
第二個參數(shù)firstiter,允許事件循環(huán)維護在其控制下實例化的弱異步生成器集。這使得可以實現(xiàn)“shutdown”機制,來安全地打開的生成器并關閉事件循環(huán)。
sys.set_asyncgen_hooks()是特定線程,因此在多個事件循環(huán)并行的時候是安全的。
sys.get_asyncgen_hooks()返回一個帶有firstiter和finalizer字段的類似于類的結構。
asyncio
asyncio事件循環(huán)將使用sys.set_asyncgen_hooks()API來維護所有被調度的弱異步生成器,并在生成器被垃圾回收時侯調度它們的aclose()方法。
為了確保asyncio程序可以可靠地完成所有被調度的異步生成器,我們建議添加一個新的事件循環(huán)協(xié)程方法loop.shutdown_asyncgens()。 該方法將使用aclose()調用關閉所有當前打開的異步生成器。
在調用loop.shutdown_asyncgens()方法之后,首次迭代新的異步生成器,事件循環(huán)就會發(fā)出警告。 我們的想法是,在請求關閉所有異步生成器之后,程序不應該執(zhí)行迭代新異步生成器的代碼。
下面是一個關于如何使用Ashutdown_asyncgens的例子:
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())#關閉所有異步迭代器
loop.close()
異步生成器對象
該對象以標準Python生成器對象為模型。 本質上異步生成器的行為復制了同步生成器的行為,唯一的區(qū)別在于API是異步的。
定義了以下方法和屬性:
1.agen.__aiter__(): 返回agen.
2.agen.__anext__(): 返回一個awaitable對象, 調用一次異步生成器的元素。
3.agen.asend(val): 返回一個awaitable對象,它在agen生成器中推送val對象。 當agen還沒迭代時,val必須為None。
上面的方法類似同步生成器的使用。
代碼例子:
import asyncio
async def gen():
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
async def start():
g = gen()
await g.asend(None) # Will return 42 after sleeping
# for 0.1 seconds.
await g.asend('hello') # Will print 'hello' and
# raise StopAsyncIteration
# (after sleeping for 0.2 seconds.)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
4.agen.athrow(typ, [val, [tb]]): 返回一個awaitable對象, 這會向agen生成器拋出一個異常。
代碼如下:
import asyncio
async def gen():
try:
await asyncio.sleep(0.1)
yield 'hello'
except IndexError:
await asyncio.sleep(0.2)
yield 'world'
async def start():
g = gen()
v = await g.asend(None)
print(v) # Will print 'hello' after
# sleeping for 0.1 seconds.
v = await g.athrow(IndexError)
print(v) # Will print 'world' after
# $ sleeping 0.2 seconds.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
5.agen.aclose(): 返回一個awaitable對象, 調用該方法會拋出一個異常給生成器。
import asyncio
async def gen():
try:
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
except:
print('運行結束')
async def start():
g = gen()
v=await g.asend(None)
print(v)
await g.aclose() #不做異常處理會報錯
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
6.agen.__name__ and agen.__qualname__:可以返回異步生成器函數(shù)的名字。
async def gen():
try:
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
except:
print('運行結束')
async def start():
g = gen()
print(g.__aiter__())#輸出async_generator對象
print(g.__name__)#輸出gen
print(g.__qualname__)#輸出gen
其他的方法
agen.ag_await: 正等待的對象(None). 類似當前可用的 gi_yieldfrom for generators and cr_await for coroutines.
agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一樣
StopIteration and StopAsyncIteration 被替換為 RuntimeError,并且不上拋。
源碼實現(xiàn)細節(jié)
異步生成器對象(PyAsyncGenObject)與PyGenObject共享結構布局。 除此之外,參考實現(xiàn)還引入了三個新對象:
PyAsyncGenASend:實現(xiàn)__anext__和asend()方法的等待對象。
PyAsyncGenAThrow:實現(xiàn)athrow()和aclose()方法的等待對象。
PyAsyncGenWrappedValue:來自異步生成器的每個直接生成的對象都隱式地裝入此結構中。 這就是生成器實現(xiàn)如何使用常規(guī)迭代協(xié)議從使用異步迭代協(xié)議生成的對象中分離出的對象。PyAsyncGenASend和PyAsyncGenAThrow是awaitable對象(它們有__await_方法返回self)類似于coroutine的對象(實現(xiàn)__iter__,__ next__,send()和throw()方法)。 本質上,它們控制異步生成器的迭代方式
PyAsyncGenASend and PyAsyncGenAThrow
PyAsyncGenASend類似生成器對象驅動__anext__ and asend() 方法,實裝了異步迭代協(xié)議。
agen.asend(val) 和agen.__anext__() 返回一個PyAsyncGenASend對象的一個引用。 (它將引用保存回父類agen對象。)
數(shù)據(jù)流定義如下:
1.首次調用PyAsyncGenASend.send(val)時, val將推入到父類agen對象 (PyGenObject利用現(xiàn)有對象。)
對PyAsyncGenASend對象進行后續(xù)迭代,將None推送到agen。
2.首次調用_PyAsyncGenWrappedValue對象時,它將被拆箱,并且以未被裝飾的值作為參數(shù)會引發(fā)StopIteration異常。
3.異步生成器中的return語句引發(fā)StopAsyncIteration異常,該異常通過PyAsyncGenASend.send()和PyAsyncGenASend.throw()方法傳播。
4.PyAsyncGenAThrow與PyAsyncGenASend非常相似。 唯一的區(qū)別是PyAsyncGenAThrow.send()在第一次調用時會向父類agen對象拋出異常(而不是將值推入其中。)
新的標準庫方法和Types
1.types.AsyncGeneratorType -- 判斷是否是異步生成器對象
2.sys.set_asyncgen_hooks()和 sys.get_asyncgen_hooks()--
在事件循環(huán)中設置異步生成器終結器和迭代攔截器。
3.inspect.isasyncgen()和 inspect.isasyncgenfunction() :方法內省。
4.asyncio加入新方法:loop.shutdown_asyncgens().
5.collections.abc.AsyncGenerator:抽象基類的添加。
是否支持向后兼容
該提案完全支持向后兼容
在python3.5,async def里使用yield會報錯,因此在python3.6引入了安全的異步生成器
性能展示
常規(guī)生成器
import time
def gen():
i = 0
while i < 100000000:
yield i
i += 1
if __name__ == '__main__':
start = time.time()
list(gen())
end = time.time()
print('totals time', end - start)
輸出
totals time 14.837260007858276
15s左右
異步迭代器的改進
import time
import asyncio
N = 10 ** 7
class AIter:
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= N:
raise StopAsyncIteration
self.i += 1
return i
async def start():
[_ async for _ in AIter()]
if __name__ == '__main__':
s=time.time()
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
e=time.time()
print('total time',e-s)
輸出
total time 5.441649913787842
很明顯迭代異步生成器的速度比迭代普通生成器不只是快了兩倍。
我們可以做一個更簡單的異步生成器
import time
import asyncio
async def ticker(delay, to):
'''Yield numbers from 0 to `to` every `delay` seconds.'''
for i in range(to):
yield i
await asyncio.sleep(delay)
async def start():
async for item in ticker(0.000001,100):
print(item)
if __name__ == '__main__':
s=time.time()
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
e=time.time()
print('total time',e-s)
設計中要注意的事項
內建函數(shù):aiter() and anext()
最初,PEP 492將__aiter__定義為應返回等待對象的方法,從而產(chǎn)生異步迭代器。
但是,在CPython 3.5.2中,重新定義了__aiter__可以直接返回異步迭代器。
為了避免破壞向后兼容性,決定Python 3.6將支持兩種方式:__aiter__仍然可以在發(fā)出DeprecationWarning時返回等待狀態(tài)。由于Python 3.6中__aiter__的這種雙重性質,我們無法添加內置的aiter()的同步實現(xiàn)。 因此,建議等到Python 3.7。
異步list/dict/set 推導式
將放在單獨的pep中也就是后來的pep530.
異步y(tǒng)ield from
對于異步生成器,yield from也不那么重要,因為不需要提供在協(xié)程之上實現(xiàn)另一個協(xié)同程序協(xié)議的機制。為了組合異步生成器,可以使用async for簡化這個過程:
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
yield v
為了asend()和athrow()是必須的
它們可以使用異步生成器實現(xiàn)類似于contextlib.contextmanager的概念。 例如,可以實現(xiàn)以下模式:
@async_context_manager
async def ctx():
await open()
try:
yield
finally:
await close()
async with ctx():
await ...
另一個原因是從__anext__對象返回的對象來推送數(shù)據(jù)并將異常拋出到異步生成器中,很難正確地執(zhí)行此操作。 添加顯式的asend()和athrow()更獲取異常后的數(shù)據(jù)。
在實現(xiàn)方面,asend()是__anext__更通用的版本,而athrow()與aclose()非常相似。 因此,為異步生成器定義這些方法不會增加任何額外的復雜性。
代碼示例
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def run():
async for i in ticker(1, 10):
print(i)
import asyncio
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()
這代碼將打出0-9,每個數(shù)字之間的間隔為1s。
提議者
Guido, 2016年9月6日
參考資料
[1] https://github.com/1st1/cpython/tree/async_gen
[2] https://mail.python.org/pipermail/python-dev/2016-September/146267.html
[3] http://bugs.python.org/issue28003