forked from china-testing/python-api-tesing
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasyncio_wait.py
More file actions
33 lines (27 loc) · 828 Bytes
/
asyncio_wait.py
File metadata and controls
33 lines (27 loc) · 828 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: xurongzhong#126.com wechat:pythontesting qq:37391319
# CreateDate: 2018-1-26
# From https://pymotw.com/3/asyncio/coroutines.html
# asyncio_wait.py
import asyncio
async def phase(i):
print('in phase {}'.format(i))
await asyncio.sleep(0.1 * i)
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting for phases to complete')
completed, pending = await asyncio.wait(phases)
results = [t.result() for t in completed]
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()