http_api.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:29
  3. # @Author : XuJiakai
  4. # @File : http_api
  5. # @Software: PyCharm
  6. import asyncio
  7. import contextlib
  8. import aiohttp
  9. from aiohttp import ClientSession
  10. from loguru import logger as log
  11. from data_clean.exception.fetch_exception import FetchException
  12. async def get(url: str, result_json: bool = True):
  13. async with aiohttp.ClientSession() as session:
  14. async with session.get(url) as response:
  15. if result_json:
  16. result = await response.json()
  17. else:
  18. result = await response.text()
  19. if response.status != 200:
  20. raise FetchException(response.status, result)
  21. return result
  22. pass
  23. async def post(url: str, data: dict):
  24. async with aiohttp.ClientSession() as session:
  25. async with session.post(url, json=data) as response:
  26. result = await response.json()
  27. if response.status != 200:
  28. raise FetchException(response.status, result)
  29. return result
  30. pass
  31. class HttpSessionReuse:
  32. def __init__(self, max_pool: int = 1):
  33. self._session_pool: list[ClientSession] = [None] * max_pool
  34. self._index = 0
  35. self._max_pool = max_pool
  36. self._lock = asyncio.Lock()
  37. pass
  38. async def _create_session(self, index: int):
  39. async with self._lock:
  40. if self._session_pool[index]:
  41. return
  42. self._session_pool[index] = aiohttp.ClientSession()
  43. log.info("init client session, session index : {}", index)
  44. pass
  45. pass
  46. @contextlib.asynccontextmanager
  47. async def get_session(self) -> ClientSession:
  48. self._index = (self._index + 1) % self._max_pool
  49. tmp_index = self._index
  50. if self._session_pool[tmp_index]:
  51. yield self._session_pool[tmp_index]
  52. pass
  53. else:
  54. await self._create_session(tmp_index)
  55. yield self._session_pool[tmp_index]
  56. pass
  57. async def post(self, url: str, data: dict):
  58. async with self.get_session() as session:
  59. async with session.post(url, json=data) as response:
  60. result = await response.json()
  61. if response.status != 200:
  62. raise FetchException(response.status, result)
  63. return result
  64. pass
  65. async def get(self, url: str, result_json: bool = True):
  66. async with self.get_session() as session:
  67. async with session.get(url) as response:
  68. if result_json:
  69. result = await response.json()
  70. else:
  71. result = await response.text()
  72. if response.status != 200:
  73. raise FetchException(response.status, result)
  74. return result
  75. async def __aenter__(self) -> "HttpSessionReuse":
  76. return self
  77. async def __aexit__(self) -> None:
  78. await self.release()
  79. async def release(self):
  80. for i in range(len(self._session_pool)):
  81. if self._session_pool[i]:
  82. await self._session_pool[i].close()
  83. log.info('close client session, session index :{}', i)
  84. pass
  85. if __name__ == '__main__':
  86. pass