# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:29 # @Author : XuJiakai # @File : http_api # @Software: PyCharm import asyncio import contextlib import aiohttp from aiohttp import ClientSession from loguru import logger as log from data_clean.exception.fetch_exception import FetchException async def get(url: str, result_json: bool = True): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if result_json: result = await response.json() else: result = await response.text() if response.status != 200: raise FetchException(response.status, result) return result pass async def post(url: str, data: dict): async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: result = await response.json() if response.status != 200: raise FetchException(response.status, result) return result pass class HttpSessionReuse: def __init__(self, max_pool: int = 1): self._session_pool: list[ClientSession] = [None] * max_pool self._index = 0 self._max_pool = max_pool self._lock = asyncio.Lock() pass async def _create_session(self, index: int): async with self._lock: if self._session_pool[index]: return self._session_pool[index] = aiohttp.ClientSession() log.info("init client session, session index : {}", index) pass pass @contextlib.asynccontextmanager async def get_session(self) -> ClientSession: self._index = (self._index + 1) % self._max_pool tmp_index = self._index if self._session_pool[tmp_index]: yield self._session_pool[tmp_index] pass else: await self._create_session(tmp_index) yield self._session_pool[tmp_index] pass async def post(self, url: str, data: dict): async with self.get_session() as session: async with session.post(url, json=data) as response: result = await response.json() if response.status != 200: raise FetchException(response.status, result) return result pass async def get(self, url: str, result_json: bool = True): async with self.get_session() as session: async with session.get(url) as response: if result_json: result = await response.json() else: result = await response.text() if response.status != 200: raise FetchException(response.status, result) return result async def __aenter__(self) -> "HttpSessionReuse": return self async def __aexit__(self) -> None: await self.release() async def release(self): for i in range(len(self._session_pool)): if self._session_pool[i]: await self._session_pool[i].close() log.info('close client session, session index :{}', i) pass if __name__ == '__main__': pass