123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 16:29
- # @Author : XuJiakai
- # @File : http_api
- # @Software: PyCharm
- import asyncio
- import contextlib
- import aiohttp
- from aiohttp import ClientSession
- from loguru import logger as log
- from data_clean.exception.fetch_exception import FetchException
- async def get(url: str, result_json: bool = True):
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if result_json:
- result = await response.json()
- else:
- result = await response.text()
- if response.status != 200:
- raise FetchException(response.status, result)
- return result
- pass
- async def post(url: str, data: dict):
- async with aiohttp.ClientSession() as session:
- async with session.post(url, json=data) as response:
- result = await response.json()
- if response.status != 200:
- raise FetchException(response.status, result)
- return result
- pass
- class HttpSessionReuse:
- def __init__(self, max_pool: int = 1):
- self._session_pool: list[ClientSession] = [None] * max_pool
- self._index = 0
- self._max_pool = max_pool
- self._lock = asyncio.Lock()
- pass
- async def _create_session(self, index: int):
- async with self._lock:
- if self._session_pool[index]:
- return
- self._session_pool[index] = aiohttp.ClientSession()
- log.info("init client session, session index : {}", index)
- pass
- pass
- @contextlib.asynccontextmanager
- async def get_session(self) -> ClientSession:
- self._index = (self._index + 1) % self._max_pool
- tmp_index = self._index
- if self._session_pool[tmp_index]:
- yield self._session_pool[tmp_index]
- pass
- else:
- await self._create_session(tmp_index)
- yield self._session_pool[tmp_index]
- pass
- async def post(self, url: str, data: dict):
- async with self.get_session() as session:
- async with session.post(url, json=data) as response:
- result = await response.json()
- if response.status != 200:
- raise FetchException(response.status, result)
- return result
- pass
- async def get(self, url: str, result_json: bool = True):
- async with self.get_session() as session:
- async with session.get(url) as response:
- if result_json:
- result = await response.json()
- else:
- result = await response.text()
- if response.status != 200:
- raise FetchException(response.status, result)
- return result
- async def __aenter__(self) -> "HttpSessionReuse":
- return self
- async def __aexit__(self) -> None:
- await self.release()
- async def release(self):
- for i in range(len(self._session_pool)):
- if self._session_pool[i]:
- await self._session_pool[i].close()
- log.info('close client session, session index :{}', i)
- pass
- if __name__ == '__main__':
- pass
|