hbase_api.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/8/14 17:54
  3. # @Author : XuJiakai
  4. # @File : hbase_api
  5. # @Software: PyCharm
  6. from data_clean.api.http_api import HttpSessionReuse
  7. from data_clean.env.environment_switch import project_env
  8. from data_clean.exception.fetch_exception import FetchException
  9. _hosts = project_env.get_val('winhc_open_api.eci_data.host')
  10. http_session_reuse = HttpSessionReuse(20)
  11. async def get(table_name: str, rowkey: str):
  12. result = await http_session_reuse.get(_hosts + '/hbase/get/' + table_name + '/' + rowkey)
  13. if not result['success']:
  14. raise FetchException(200, result, '内容异常')
  15. return result['data']
  16. pass
  17. async def bulk_get(table_name, rowkey: list, query_key: list = None):
  18. if query_key is None:
  19. query_key = []
  20. rowkey = list(set(rowkey))
  21. result = await http_session_reuse.post(_hosts + '/hbase/bulk-get?tableName=' + table_name, data={
  22. "query_key": query_key,
  23. "rowkey": rowkey
  24. })
  25. if not result['success']:
  26. raise FetchException(200, result, '内容异常')
  27. r = []
  28. result = result['data']
  29. for k in result:
  30. result[k]['ROWKEY'] = k
  31. r.append(result[k])
  32. return r
  33. pass
  34. if __name__ == '__main__':
  35. import asyncio
  36. asyncio.run(get("ng_rt_company", "bc702f0f5202342a9c1c75fbf9be9aff"))
  37. # asyncio.run(bulk_get("ng_rt_company", ["a33f3cd172f8f9bd61d5f3cd84a4ffd9", "bc702f0f5202342a9c1c75fbf9be9aff"]))
  38. pass