asyncio_pool.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:10
  3. # @Author : XuJiakai
  4. # @File : async_pool
  5. # @Software: PyCharm
  6. import asyncio
  7. import functools
  8. import signal
  9. import sys
  10. from typing import Coroutine
  11. from data_clean.utils import get_log
  12. log = get_log("JobMain")
  13. class AsyncPool(object):
  14. def __init__(self, max_concurrency: int):
  15. self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
  16. self._running_task_num = 0
  17. async def create_task(self, coro: Coroutine) -> asyncio.Task:
  18. await self._semaphore.acquire()
  19. self._running_task_num += 1
  20. task: asyncio.Task = asyncio.create_task(coro)
  21. task.add_done_callback(lambda t: self._release(t))
  22. return task
  23. def is_done(self):
  24. return self._running_task_num == 0
  25. async def wait_all_done(self):
  26. while self._running_task_num > 0:
  27. log.info("wait done...")
  28. await asyncio.sleep(1)
  29. def _release(self, t):
  30. self._running_task_num -= 1
  31. self._semaphore.release()
  32. class GracefulExit(SystemExit):
  33. code = 1
  34. class AsyncPoolListenShut:
  35. def __init__(self, max_concurrency: int):
  36. self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
  37. self._data: dict = {}
  38. self._look = False
  39. self._is_windows = sys.platform == 'win32'
  40. self._register_shutdown_by_signal()
  41. self._current_data = None
  42. def _register_shutdown_by_signal(self):
  43. print("注册shutdown")
  44. signal.signal(signal.SIGINT, functools.partial(self.listen_shutdown))
  45. signal.signal(signal.SIGTERM, functools.partial(self.listen_shutdown))
  46. pass
  47. async def create_task(self, coro: Coroutine, data) -> asyncio.Task:
  48. self._current_data = data
  49. if self._look:
  50. print("停止消费...")
  51. await asyncio.sleep(10000)
  52. pass
  53. await self._semaphore.acquire()
  54. task: asyncio.Task = asyncio.create_task(coro)
  55. print('创建task,id: ', id(task))
  56. self._data[task] = data
  57. task.add_done_callback(lambda t: self._release(t))
  58. return task
  59. pass
  60. def _release(self, t):
  61. print("释放task,id:", id(t))
  62. del self._data[t]
  63. self._semaphore.release()
  64. async def _shutdown(self):
  65. print("检测到停止信号...")
  66. # await self._look.acquire()
  67. count = len(self._data)
  68. print('当前数量%s' % count)
  69. num = 0
  70. while count > 0:
  71. num += 1
  72. print("\n第%s" % num)
  73. for t in self._data.keys():
  74. print(id(t), "是否结束", t.done())
  75. count -= 1 if t.done() else 0
  76. await asyncio.sleep(3)
  77. await asyncio.sleep(3)
  78. print("所有任务已经结束!")
  79. def listen_shutdown(self, *args, **kwargs):
  80. all_data = list(self._data.values()) + [self._current_data]
  81. print("all_data: ", all_data)
  82. self._look = True
  83. print("所有任务已经结束!")
  84. loop = asyncio.get_running_loop()
  85. tasks = asyncio.tasks.all_tasks(loop)
  86. for t in tasks:
  87. t.cancel()
  88. loop.stop()
  89. # raise GracefulExit()
  90. pass
  91. pass
  92. async def run1(tt=None):
  93. print("sleeping")
  94. await asyncio.sleep(3)
  95. print("slept")
  96. if tt:
  97. print(tt)
  98. pass
  99. async def callback(msg):
  100. print("callback")
  101. await asyncio.sleep(3)
  102. pass
  103. async def main():
  104. pool = AsyncPool(5)
  105. for i in range(10):
  106. await pool.create_task(run1())
  107. await pool.wait_all_done()
  108. pass
  109. if __name__ == '__main__':
  110. asyncio.run(main())
  111. pass