日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

PEP 525 --異步生成器

 鷹兔牛熊眼 2019-01-25

簡述

PEP492引入了對Python 3.5的原生協(xié)程和async/await句法的支持。本次提案添加了對異步生成器的支持進而來擴展Python的異步功能。

理論和目標

常規(guī)生成器(在PEP 255中引入)的實現(xiàn),使得編寫復雜數(shù)據(jù)變得更優(yōu)雅,它們的行為類似于迭代器。
當時沒有提供async for使用的異步生成器。 編寫異步數(shù)據(jù)生成器變得非常復雜,因為必須定義一個實現(xiàn)__aiter__和__anext__的方法,才能在async for語句中使用它。
為了說明異步生成器的重要性,專門做了性能測試,測試結果表明使用異步生成器要比使用異步迭代器快2倍多。
下面的代碼是演示了在迭代的過程中等待幾秒

class Ticker:
    '''Yield numbers from 0 to `to` every `delay` seconds.'''

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

我們那可以使用下面的代碼實現(xiàn)同樣的功能:

async def ticker(delay, to):
    '''Yield numbers from 0 to `to` every `delay` seconds.'''
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

詳細說明

異步生成器

我們直到在函數(shù)中使用一個或多個yield該函數(shù)將變成一個生成器。

def func():            # 方法
    return

def genfunc():         # 生成器方法
    yield

我們提議使用類似的功能實現(xiàn)下面異步生成器:

async def coro():      # 一個協(xié)程方法
    await smth()

async def asyncgen():  # 一個異步生成器方法
    await smth()
    yield 42

調用異步生成器函數(shù)的結果是異步生成器對象,它實現(xiàn)了PEP 492中定義的異步迭代協(xié)議。
注意:在異步生成器中使用非空return語句會引發(fā)SyntaxError錯誤。

對異步迭代協(xié)議的支持

該協(xié)議需要實現(xiàn)兩種特殊方法:
__aiter__方法返回一個異步迭代器。
__anext__方法返回一個awaitable對象,它使用StopIteration異常來捕獲yield的值,使用StopAsyncIteration異常來表示迭代結束。
異步生成器定義了這兩種方法。 讓我們實現(xiàn)一個一個簡單的異步生成器:

import asyncio
async def genfunc():
    yield 1
    yield 2

gen = genfunc()

async def start():
    assert gen.__aiter__() is gen
    assert await gen.__anext__() == 1
    assert await gen.__anext__() == 2
    await gen.__anext__()  # This line will raise StopAsyncIteration.

if __name__ == '__main__':
    asyncio.run(start())

終止

PEP 492提到需要使用事件循環(huán)或調度程序來運行協(xié)程。 因為異步生成器是在協(xié)程使用的,所以還需要創(chuàng)建一個事件循環(huán)來運行。
異步生成器可以有try..finally塊,也可以用async with異步上下文管理代碼快。 重要的是提供一種保證,即使在部分迭代時,也可以進行垃圾收集,生成器可以安全終止。

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for i in square_series(con, 1000):
    if i == 100:
        break

上面代碼演示了異步生成器在async with中使用,然后使用async for對異步生成器對象進行迭代處理,同時我們也可以設置一個中斷條件。
square_series()生成器將被垃圾收集,并沒有異步關閉生成器的機制,Python解釋器將無法執(zhí)行任何操作。
為了解決這個問題,這里提出以下改進建議:
1.在異步生成器上實現(xiàn)一個aclose方法,返回一個特殊awaittable 對象。 當awaitable拋出GeneratorExit異常的時候,拋出到掛起的生成器中并對其進行迭代,直到發(fā)生GeneratorExit或StopAsyncIteration。這就是在常規(guī)函數(shù)中使用close方法關閉對象一樣,只不過aclose需要一個事件循環(huán)去執(zhí)行。
2.不要在異步生成器中使用yield語句,只能用await。
3.在sys模塊中加兩個方法:set_asyncgen_hooks() and get_asyncgen_hooks().
sys.set_asyncgen_hooks()背后的思想是允許事件循環(huán)攔截異步生成器的迭代和終結,這樣最終用戶就不需要關心終結問題了,一切正常。
sys.set_asyncgen_hooks() 可以結束兩個參數(shù)
firstiter:一個可調用的,當?shù)谝淮蔚惒缴善鲿r將調用它。
finalizer:一個可調用的,當異步生成器即將被GC時將被調用。
當?shù)谝坏惒缴善鲿r,它會引用到當前的finalizer。
當異步生成器即將被垃圾收集時,它會調用其緩存的finalizer。假想在事件循環(huán)激活異步生成器開始迭代的時候,finalizer將調用一個aclose()方法.
例如,以下是如何修改asyncio以允許安全地完成異步生成器:

# asyncio/base_events.py

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二個參數(shù)firstiter,允許事件循環(huán)維護在其控制下實例化的弱異步生成器集。這使得可以實現(xiàn)“shutdown”機制,來安全地打開的生成器并關閉事件循環(huán)。
sys.set_asyncgen_hooks()是特定線程,因此在多個事件循環(huán)并行的時候是安全的。
sys.get_asyncgen_hooks()返回一個帶有firstiter和finalizer字段的類似于類的結構。

asyncio

asyncio事件循環(huán)將使用sys.set_asyncgen_hooks()API來維護所有被調度的弱異步生成器,并在生成器被垃圾回收時侯調度它們的aclose()方法。
為了確保asyncio程序可以可靠地完成所有被調度的異步生成器,我們建議添加一個新的事件循環(huán)協(xié)程方法loop.shutdown_asyncgens()。 該方法將使用aclose()調用關閉所有當前打開的異步生成器。
在調用loop.shutdown_asyncgens()方法之后,首次迭代新的異步生成器,事件循環(huán)就會發(fā)出警告。 我們的想法是,在請求關閉所有異步生成器之后,程序不應該執(zhí)行迭代新異步生成器的代碼。
下面是一個關于如何使用Ashutdown_asyncgens的例子:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())#關閉所有異步迭代器
    loop.close()

異步生成器對象

該對象以標準Python生成器對象為模型。 本質上異步生成器的行為復制了同步生成器的行為,唯一的區(qū)別在于API是異步的。
定義了以下方法和屬性:
1.agen.__aiter__(): 返回agen.
2.agen.__anext__(): 返回一個awaitable對象, 調用一次異步生成器的元素。
3.agen.asend(val): 返回一個awaitable對象,它在agen生成器中推送val對象。 當agen還沒迭代時,val必須為None。
上面的方法類似同步生成器的使用。
代碼例子:

import asyncio


async def gen():
    await asyncio.sleep(0.1)
    v = yield 42
    print(v)
    await asyncio.sleep(0.2)



async def start():
    g = gen()

    await g.asend(None)  # Will return 42 after sleeping
    # for 0.1 seconds.

    await g.asend('hello')  # Will print 'hello' and
    # raise StopAsyncIteration
    # (after sleeping for 0.2 seconds.)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

4.agen.athrow(typ, [val, [tb]]): 返回一個awaitable對象, 這會向agen生成器拋出一個異常。
代碼如下:

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        yield 'hello'
    except IndexError:
        await asyncio.sleep(0.2)
        yield 'world'


async def start():
    g = gen()
    v = await g.asend(None)
    print(v)  # Will print 'hello' after
    # sleeping for 0.1 seconds.

    v = await g.athrow(IndexError)
    print(v)  # Will print 'world' after
    # $ sleeping 0.2 seconds.


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

5.agen.aclose(): 返回一個awaitable對象, 調用該方法會拋出一個異常給生成器。

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print('運行結束'


async def start():
    g = gen()
    v=await g.asend(None)
    print(v)
    await g.aclose() #不做異常處理會報錯


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

6.agen.__name__ and agen.__qualname__:可以返回異步生成器函數(shù)的名字。

async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print('運行結束')

async def start():
    g = gen()
    print(g.__aiter__())#輸出async_generator對象
    print(g.__name__)#輸出gen
    print(g.__qualname__)#輸出gen

其他的方法

agen.ag_await: 正等待的對象(None). 類似當前可用的 gi_yieldfrom for generators and cr_await for coroutines.
agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一樣

StopIteration and StopAsyncIteration 被替換為 RuntimeError,并且不上拋。

源碼實現(xiàn)細節(jié)

異步生成器對象(PyAsyncGenObject)與PyGenObject共享結構布局。 除此之外,參考實現(xiàn)還引入了三個新對象:
PyAsyncGenASend:實現(xiàn)__anext__和asend()方法的等待對象。
PyAsyncGenAThrow:實現(xiàn)athrow()和aclose()方法的等待對象。
PyAsyncGenWrappedValue:來自異步生成器的每個直接生成的對象都隱式地裝入此結構中。 這就是生成器實現(xiàn)如何使用常規(guī)迭代協(xié)議從使用異步迭代協(xié)議生成的對象中分離出的對象。PyAsyncGenASend和PyAsyncGenAThrow是awaitable對象(它們有__await_方法返回self)類似于coroutine的對象(實現(xiàn)__iter__,__ next__,send()和throw()方法)。 本質上,它們控制異步生成器的迭代方式

PyAsyncGenASend and PyAsyncGenAThrow

PyAsyncGenASend類似生成器對象驅動__anext__ and asend() 方法,實裝了異步迭代協(xié)議。
agen.asend(val) 和agen.__anext__() 返回一個PyAsyncGenASend對象的一個引用。 (它將引用保存回父類agen對象。)
數(shù)據(jù)流定義如下:
1.首次調用PyAsyncGenASend.send(val)時, val將推入到父類agen對象 (PyGenObject利用現(xiàn)有對象。)
對PyAsyncGenASend對象進行后續(xù)迭代,將None推送到agen。
2.首次調用_PyAsyncGenWrappedValue對象時,它將被拆箱,并且以未被裝飾的值作為參數(shù)會引發(fā)StopIteration異常。
3.異步生成器中的return語句引發(fā)StopAsyncIteration異常,該異常通過PyAsyncGenASend.send()和PyAsyncGenASend.throw()方法傳播。
4.PyAsyncGenAThrow與PyAsyncGenASend非常相似。 唯一的區(qū)別是PyAsyncGenAThrow.send()在第一次調用時會向父類agen對象拋出異常(而不是將值推入其中。)

新的標準庫方法和Types

1.types.AsyncGeneratorType -- 判斷是否是異步生成器對象
2.sys.set_asyncgen_hooks()和 sys.get_asyncgen_hooks()--
在事件循環(huán)中設置異步生成器終結器和迭代攔截器。
3.inspect.isasyncgen()和 inspect.isasyncgenfunction() :方法內省。
4.asyncio加入新方法:loop.shutdown_asyncgens().
5.collections.abc.AsyncGenerator:抽象基類的添加。

是否支持向后兼容

該提案完全支持向后兼容
在python3.5,async def里使用yield會報錯,因此在python3.6引入了安全的異步生成器

性能展示

常規(guī)生成器

import time


def gen():
    i = 0
    while i < 100000000:
        yield i
        i += 1


if __name__ == '__main__':
    start = time.time()
    list(gen())
    end = time.time()
    print('totals time'end - start)

輸出

totals time 14.837260007858276

15s左右

異步迭代器的改進

import time
import asyncio

N = 10 ** 7
class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def start():
    [_ async for _ in AIter()]
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print('total time',e-s)

輸出

total time 5.441649913787842

很明顯迭代異步生成器的速度比迭代普通生成器不只是快了兩倍。
我們可以做一個更簡單的異步生成器

import time
import asyncio

async def ticker(delay, to):
    '''Yield numbers from 0 to `to` every `delay` seconds.'''
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def start():
    async for item in ticker(0.000001,100):
        print(item)
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print('total time',e-s)

設計中要注意的事項

內建函數(shù):aiter() and anext()
最初,PEP 492將__aiter__定義為應返回等待對象的方法,從而產(chǎn)生異步迭代器。
但是,在CPython 3.5.2中,重新定義了__aiter__可以直接返回異步迭代器。
為了避免破壞向后兼容性,決定Python 3.6將支持兩種方式:__aiter__仍然可以在發(fā)出DeprecationWarning時返回等待狀態(tài)。由于Python 3.6中__aiter__的這種雙重性質,我們無法添加內置的aiter()的同步實現(xiàn)。 因此,建議等到Python 3.7。

異步list/dict/set 推導式

將放在單獨的pep中也就是后來的pep530.

異步y(tǒng)ield from

對于異步生成器,yield from也不那么重要,因為不需要提供在協(xié)程之上實現(xiàn)另一個協(xié)同程序協(xié)議的機制。為了組合異步生成器,可以使用async for簡化這個過程:

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

為了asend()和athrow()是必須的

它們可以使用異步生成器實現(xiàn)類似于contextlib.contextmanager的概念。 例如,可以實現(xiàn)以下模式:

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一個原因是從__anext__對象返回的對象來推送數(shù)據(jù)并將異常拋出到異步生成器中,很難正確地執(zhí)行此操作。 添加顯式的asend()和athrow()更獲取異常后的數(shù)據(jù)。
在實現(xiàn)方面,asend()是__anext__更通用的版本,而athrow()與aclose()非常相似。 因此,為異步生成器定義這些方法不會增加任何額外的復雜性。

代碼示例

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(110):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

這代碼將打出0-9,每個數(shù)字之間的間隔為1s。

提議者

Guido, 2016年9月6日

參考資料

[1]    https://github.com/1st1/cpython/tree/async_gen
[2]    https://mail.python.org/pipermail/python-dev/2016-September/146267.html
[3]    http://bugs.python.org/issue28003

版權聲明

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多