@@ -526,3 +526,322 @@ asyncio 中的 `wait` 和 `gather` 类似,其提供了这种情况下更加具
526526
527527` wait ` 签名的本质是一系列 awaitable 对象的列表,伴随着可选的超时和 ` return_when ` 字符串。
528528该字符串有几个预定义的值:` ALL_COMPLETED ` , ` FIRST_EXCEPTION ` 和 ` FIRST_COMPLETED ` ,默认是 ` ALL_COMPLETED ` 。
529+
530+ > 实际上,现代版本的 Python ` asyncio.wait ` 只接受 ` task ` 对象。
531+
532+ ### Waiting for all tasks to complete
533+
534+ 不指定 ` return_when ` 的默认情况下,会在所有任务完成后再返回,这和 ` asyncio.gather ` 很像,但有店差别。
535+
536+ ``` Python
537+ import asyncio
538+ import aiohttp
539+ from aiohttp import ClientSession
540+ from util import async_timed
541+
542+ async def fetch_status (session : ClientSession, url : str ) -> int :
543+ async with session.get(url, ssl = False ) as result:
544+ return result.status
545+
546+ @async_timed ()
547+ async def main ():
548+ async with aiohttp.ClientSession() as session:
549+ fetchers = [
550+ asyncio.create_task(fetch_status(session, ' https://example.com' )),
551+ asyncio.create_task(fetch_status(session, ' https://example.com' )),
552+ ]
553+ done, pending = await asyncio.wait(fetchers)
554+
555+ print (' Done task count:' , len (done))
556+ print (' Pending task count:' , len (pending))
557+
558+ for done_task in done:
559+ result = await done_task
560+ print (result)
561+
562+ asyncio.run(main())
563+ ```
564+
565+ 输出如下:
566+
567+ ``` text
568+ Starting <function main at 0x102d19da0> with args () {}
569+ Done task count: 2
570+ Pending task count: 0
571+ 200
572+ 200
573+ Finished <function main at 0x102d19da0> in 0.6201 second(s)
574+ ```
575+
576+ 如果有一个请求报错,` asyncio.wait ` 并不会像 ` asyncio.gather ` 那样抛出(第一个)异常。
577+ 在这种情况下,有这些异常处理方法。
578+ 可以使用 ` await ` 并抛出异常,也可以使用 ` await ` 将其包裹在 ` try except ` 块中以处理异常。
579+ 或者可以使用 ` task.result() ` 和 ` task.exception() ` 方法。
580+ 我们可以安全地调用这些方法,因为 ` done ` 集合中的任务都是已完成的任务;如果尚未完成就调用这些方法则会抛出异常。
581+
582+ 下面使用两个 Task 方法来处理异常:处理完成打印结果,如果报错则记录日志。
583+
584+ ``` Python
585+ import asyncio
586+ import logging
587+
588+ @async_timed ()
589+ async def main ():
590+ async with aiohttp.ClientSession() as session:
591+ good_request = fetch_status(session, ' https://www.example.com' )
592+ bad_request = fetch_status(session, ' python://bad' )
593+
594+ fetchers = [
595+ asyncio.create_task(good_request),
596+ asyncio.create_task(bad_request),
597+ ]
598+ done, pending = await asyncio.wait(fetchers)
599+
600+ print (' Done task count:' , len (done))
601+ print (' Pending task count:' , len (pending))
602+
603+ for done_task in done:
604+ # result = await done_task will throw an exception
605+ if done_task.exception() is None :
606+ print (done_task.result())
607+ else :
608+ logging.error(' Request got an exception' , exc_info = done_task.exception())
609+
610+ asyncio.run(main())
611+ ```
612+
613+ 使用 ` done_task.exception() ` 会检查是否有报错。
614+ 如果没有则可以使用 ` done_task.result() ` 获取结果。
615+ 如果 ` exception ` 不是 ` None ` ,那么就产生了报错,需要进行处理。
616+ 这里简单打印了异常栈跟踪,运行该代码会输出类似内容:
617+
618+ ``` text
619+ ERROR:root:Request got an exception
620+ Traceback (most recent call last):
621+ File "/Users/starslayerx/GitHub/book_asyncio/async_wait.py", line 8, in fetch_status
622+ async with session.get(url, ssl=False) as result:
623+ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/aiohttp/client.py", line 1510,
624+ in __aenter__
625+ self._resp: _RetType = await self._coro
626+ ^^^^^^^^^^^^^^^^
627+ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/aiohttp/client.py", line 558, i
628+ n _request
629+ raise NonHttpUrlClientError(url)
630+ aiohttp.client_exceptions.NonHttpUrlClientError: python://bad
631+ ```
632+
633+ ### Waiting for exceptions
634+
635+ ` ALL_COMPLETED ` 的缺点和 ` gather ` 函数一样。
636+ 即无法在运行中处理异常,必须等待所有任务处理完成。
637+ 这会导致一个问题,即对于某些异常,我们可能会希望取消剩下的所有任务。
638+ 因此我们希望立刻处理产生错误的异常,然后继续运行剩下的任务。
639+
640+ 为了支持这种情况,` wait ` 使用 ` FIRST_EXCEPTION ` 选项。
641+ 下面是一个在遇到异常时取消请求的例子。
642+
643+ ``` Python
644+ import aiohttp
645+ import asyncio
646+ import loggin
647+ from util import async_timed
648+
649+
650+ async def fetch_status (session : ClientSession, url : str , delay : int | None = None ) -> int :
651+ if delay:
652+ asyncio.sleep(delay)
653+ async with session.get(url, ssl = False ) as result:
654+ return result.status
655+
656+ @async_timed ()
657+ async def main ():
658+ async with aiohttp.ClientSession() as session:
659+ fetchers = [
660+ asyncio.create_task(fetch_status(session, ' python://bad.com' )),
661+ asyncio.create_task(fetch_status(session, ' https://www.example.com' , delay = 3 )),
662+ asyncio.create_task(fetch_status(session, ' https://www.example.com' , delay = 3 )),
663+ ]
664+
665+ done, pending = await asyncio.wait(fetchers, return_when = asyncio.FIRST_EXCEPTION )
666+
667+ print (' Done task count:' , len (done))
668+ print (' Pending task count:' , len (pending))
669+
670+ for done_task in done:
671+ if done_task.exception() is None :
672+ print (done_task.result())
673+ else :
674+ logging.error(' Request got an exception' , exc_info = done_task.exception())
675+
676+ for pending_task in pending:
677+ pending_task.cancel()
678+
679+ asyncio.run(main())
680+ ```
681+
682+ 在当前列表中,有一个错误请求和两个正确请求,每个持续 3 秒。
683+ 当 ` await asyncio.wait ` 语句时,会立刻返回错误请求。
684+ 然后循环 ` done ` 任务集合,这种情况下,只有一个已完成任务。
685+ 然后,会执行分支代码,并打印异常。
686+
687+ 在 ` pending ` set 中有两个元素,每个请求消耗大概 3 秒,而且第一个请求几乎立刻失败。
688+ 因为这里希望停止运行,对 task 调用 ` cancel ` 方法。
689+ 有以下的输出:
690+
691+ ``` text
692+ Starting <function main at 0x102af1e40> with args () {}
693+ Done task count: 1
694+ Pending task count: 2
695+ ERROR:root:Request got an exception
696+ Traceback (most recent call last):
697+ File "/Users/starslayerx/GitHub/book_asyncio/async_wait.py", line 10, in fetch_status
698+ async with session.get(url, ssl=False) as result:
699+ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/aiohttp/client.py", line 1510, i
700+ n __aenter__
701+ self._resp: _RetType = await self._coro
702+ ^^^^^^^^^^^^^^^^
703+ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/aiohttp/client.py", line 558, in
704+ _request
705+ raise NonHttpUrlClientError(url)
706+ aiohttp.client_exceptions.NonHttpUrlClientError: python://bad.com
707+ Finished <function main at 0x102af1e40> in 0.0015 second(s)
708+ ```
709+
710+ ### Processing results as they complete
711+
712+ ` ALL_COMPLETED ` 和 ` FIRST_EXCEPTION ` 都有一个缺陷,即如果没有出现异常,他们都必须等到所有任务完成。
713+ 如果希望在结果返回后能立刻处理它,类似 ` as_completed ` 函数那样,但该函数看不到哪些任务完成了,哪些任务还在运行。
714+ 此时,可以使用 ` return_when ` 的 ` FIRST_COMPLETED ` 选项。
715+
716+ 该选项会让 ` wait ` 有至少一个返回值后返回,可能是 task 完成或出现异常。
717+ 然后可以据此,取消其他任务,或调整还在运行中的任务。
718+ 下面通过该选项发送几个 web 请求,并处理最先完成的哪个。
719+
720+ ``` Python
721+ import asyncio
722+ import aiohttp
723+ from util import async_timed
724+
725+ @async_timed ()
726+ async def main ():
727+ async with aiohttp.ClientSession() as session:
728+ url = ' https://www.example.com'
729+ fetchers = [
730+ asyncio.create_task(fetch_status(session, url)),
731+ asyncio.create_task(fetch_status(session, url)),
732+ asyncio.create_task(fetch_status(session, url)),
733+ ]
734+ done, pending = await asyncio.wait(fetchers, return_when = asyncio.FIRST_COMPLETED )
735+
736+ print (' Done task count:' , len (done))
737+ print (' Pending task count:' , len (pending))
738+
739+ for done_task in done:
740+ print (await done_task)
741+
742+ asyncio.run(main())
743+ ```
744+
745+ 输出如下,一个请求完成后立刻返回:
746+
747+ ``` text
748+ Starting <function main at 0x104ee59e0> with args () {}
749+ Done task count: 1
750+ Pending task count: 2
751+ 200
752+ Finished <function main at 0x104ee59e0> in 0.6419 second(s)
753+ ```
754+
755+ 这些请求几乎可以在同一时间完成,因此还可能会看到输出显示有两到三个任务已完成。
756+
757+ 下面例子中,循环判断 ` pending ` 集合中是否还有任务。
758+ 一旦 ` wait ` 获得结果,就更新 ` done ` 和 ` pending ` 集合,然后打印出所有已完成的任务。
759+ 这将产生类似于 ` as_completed ` 的行为,不同之处在于能更清晰地了解哪些任务已完成。
760+
761+ ``` Python
762+ import asyncio
763+ import aiohttp
764+ from util import async_timed
765+
766+ async def fetch_status (session : aiohttp.ClientSession, url : str ) -> int :
767+ async with session.get(url, ssl = False ) as result:
768+ return result.status
769+
770+ @async_timed ()
771+ async def main ():
772+ async with aiohttp.ClientSession() as session:
773+ url = ' https://www.example.com'
774+ pending = [
775+ asyncio.create_task(fetch_status(session, url)),
776+ asyncio.create_task(fetch_status(session, url)),
777+ asyncio.create_task(fetch_status(session, url)),
778+ ]
779+
780+ while pending:
781+ done, pending = asyncio.wait(pending, return_when = asyncio.FIRST_COMPLETED )
782+ print (' Done task count:' , len (done))
783+ print (' Pending task count:' , len (pending))
784+
785+ for done_task in done:
786+ print (await done_task)
787+
788+ asyncio.run(main())
789+ ```
790+
791+ 输出如下
792+
793+ ``` text
794+ Starting <function main at 0x10222dda0> with args () {}
795+ Done task count: 1
796+ Pending task count: 2
797+ 200
798+ Done task count: 2
799+ Pending task count: 0
800+ 200
801+ 200
802+ Finished <function main at 0x10222dda0> in 0.7814 second(s)
803+
804+ ```
805+
806+ 也可能这样
807+
808+ ``` text
809+ Starting <function main at 0x1028fdda0> with args () {}
810+ Done task count: 1
811+ Pending task count: 2
812+ 200
813+ Done task count: 1
814+ Pending task count: 1
815+ 200
816+ Done task count: 1
817+ Pending task count: 0
818+ 200
819+ Finished <function main at 0x1028fdda0> in 0.7221 second(s)
820+ ```
821+
822+ ### Handling timeouts
823+
824+ 为了更细粒度的控制,` wait ` 还允许我们设置超时时间。
825+ 可以通过设置 ` timeout ` 参数来指定最大的秒数。
826+ 如果超出超时时间,` wait ` 会返回 ` done ` 和 ` pending ` 任务集合。
827+ 对比之前的 ` wait_for ` 和 ` as_completed ` ,` wait ` 在处理超时方面有一些差异。
828+
829+ - ` asyncio.wait_for ` 将单个特定任务设置"硬性时间",一旦超时会自动取消正在运行的任务,抛出 ` asyncio.TimeoutError ` 或 ` TimeoutError ` (Python 3.11+)
830+
831+ ``` Python
832+ try :
833+ result = await asyncio.wait_for(my_coro(), tiemout = 5.0 )
834+ except asyncio.TimeoutError:
835+ print (' 任务超时,且已被自动取消' )
836+ ```
837+
838+ - ` asyncio.as_completed ` 并发处理多个任务,并在任何一个任务完成时处理结果,其超时时间是对于整个迭代过程的。超时并不会自动取消剩余任务,需要手动处理剩下任务。
839+
840+ ``` Python
841+ for coro in asyncio.as_completed([task1, task2], timeout = 2.0 )
842+ try :
843+ result = await coro
844+ print (f ' 得到结果: { result} ' )
845+ except asyncio.TimeoutError:
846+ print (' 时间到了,剩下的任务还在后台跑,但我等不到了' )
847+ ```
0 commit comments