asyncio_pool.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:10
  3. # @Author : XuJiakai
  4. # @File : async_pool
  5. # @Software: PyCharm
  6. import asyncio
  7. import functools
  8. import signal
  9. import sys
  10. from typing import Coroutine
  11. from loguru import logger as log
  12. class AsyncPool(object):
  13. def __init__(self, max_concurrency: int):
  14. self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
  15. self._running_task_num = 0
  16. async def create_task(self, coro: Coroutine) -> asyncio.Task:
  17. await self._semaphore.acquire()
  18. self._running_task_num += 1
  19. task: asyncio.Task = asyncio.create_task(coro)
  20. task.add_done_callback(lambda t: self._release(t))
  21. return task
  22. def is_done(self):
  23. return self._running_task_num == 0
  24. async def wait_all_done(self):
  25. while self._running_task_num > 0:
  26. log.info("running task num : {} , wait done...", self._running_task_num)
  27. await asyncio.sleep(1)
  28. def _release(self, t):
  29. self._running_task_num -= 1
  30. self._semaphore.release()
  31. class GracefulExit(SystemExit):
  32. code = 1
  33. class AsyncPoolListenShut:
  34. def __init__(self, max_concurrency: int):
  35. self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
  36. self._data: dict = {}
  37. self._look = False
  38. self._is_windows = sys.platform == 'win32'
  39. self._register_shutdown_by_signal()
  40. self._current_data = None
  41. def _register_shutdown_by_signal(self):
  42. print("注册shutdown")
  43. signal.signal(signal.SIGINT, functools.partial(self.listen_shutdown))
  44. signal.signal(signal.SIGTERM, functools.partial(self.listen_shutdown))
  45. pass
  46. async def create_task(self, coro: Coroutine, data) -> asyncio.Task:
  47. self._current_data = data
  48. if self._look:
  49. print("停止消费...")
  50. await asyncio.sleep(10000)
  51. pass
  52. await self._semaphore.acquire()
  53. task: asyncio.Task = asyncio.create_task(coro)
  54. print('创建task,id: ', id(task))
  55. self._data[task] = data
  56. task.add_done_callback(lambda t: self._release(t))
  57. return task
  58. pass
  59. def _release(self, t):
  60. print("释放task,id:", id(t))
  61. del self._data[t]
  62. self._semaphore.release()
  63. async def _shutdown(self):
  64. print("检测到停止信号...")
  65. # await self._look.acquire()
  66. count = len(self._data)
  67. print('当前数量%s' % count)
  68. num = 0
  69. while count > 0:
  70. num += 1
  71. print("\n第%s" % num)
  72. for t in self._data.keys():
  73. print(id(t), "是否结束", t.done())
  74. count -= 1 if t.done() else 0
  75. await asyncio.sleep(3)
  76. await asyncio.sleep(3)
  77. print("所有任务已经结束!")
  78. def listen_shutdown(self, *args, **kwargs):
  79. all_data = list(self._data.values()) + [self._current_data]
  80. print("all_data: ", all_data)
  81. self._look = True
  82. print("所有任务已经结束!")
  83. loop = asyncio.get_running_loop()
  84. tasks = asyncio.tasks.all_tasks(loop)
  85. for t in tasks:
  86. t.cancel()
  87. loop.stop()
  88. # raise GracefulExit()
  89. pass
  90. pass
  91. async def run1(tt=None):
  92. print("sleeping")
  93. await asyncio.sleep(3)
  94. print("slept")
  95. if tt:
  96. print(tt)
  97. pass
  98. async def callback(msg):
  99. print("callback")
  100. await asyncio.sleep(3)
  101. pass
  102. async def main():
  103. pool = AsyncPool(5)
  104. for i in range(10):
  105. await pool.create_task(run1())
  106. await pool.wait_all_done()
  107. pass
  108. if __name__ == '__main__':
  109. asyncio.run(main())
  110. pass