WinhcAllClient.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2021/5/27 14:31
  3. # @Author : XuJiakai
  4. # @File : WinhcAllClient
  5. # @Software: PyCharm
  6. import psycopg2
  7. import pymysql
  8. from elasticsearch import Elasticsearch
  9. from kafka import KafkaConsumer, TopicPartition
  10. from kafka import KafkaProducer
  11. from odps import ODPS
  12. from pymongo import MongoClient
  13. from env.environment_switch import environment_switch
  14. from sdk.WinhcHbaseApi import WinhcHbaseApi
  15. class WinhcAllClient:
  16. def __init__(self):
  17. self.env = environment_switch()
  18. pass
  19. def get_hbase_client(self):
  20. host = self.env.get_val('winhc_open_api.eci_data.host')
  21. return WinhcHbaseApi(host)
  22. def get_es_client(self, es_name='new'):
  23. hosts = self.env.get_val('es.' + es_name + '.hosts')
  24. username = self.env.get_val('es.' + es_name + '.username')
  25. password = self.env.get_val('es.' + es_name + '.pwd')
  26. use_ssl = False
  27. return Elasticsearch(hosts.split(','), http_auth=(username, password), use_ssl=use_ssl)
  28. def get_mongo_db(self, db='itslaw', mongo_name='itslaw'):
  29. url = self.env.get_val('mongo.' + mongo_name + '.url')
  30. c = MongoClient(url)
  31. return c[db]
  32. def get_mongo_collection(self, coll_name, db='itslaw', mongo_name='itslaw'):
  33. db = self.get_mongo_db(db=db, mongo_name=mongo_name)
  34. return db[coll_name]
  35. def get_kafka_sender(self, name='base'):
  36. hosts = self.env.get_val('kafka.' + name + '.hosts')
  37. return KafkaProducer(bootstrap_servers=hosts,
  38. api_version=(0, 10),
  39. retries=3)
  40. def get_kafka_consumer(self, group_id, name='base'):
  41. hosts = self.env.get_val('kafka.' + name + '.hosts')
  42. return KafkaConsumer(bootstrap_servers=hosts,
  43. group_id=group_id,
  44. api_version=(0, 10, 2),
  45. session_timeout_ms=25000,
  46. max_poll_records=100,
  47. fetch_max_bytes=1 * 1024 * 1024)
  48. def get_kafka_consumer_by_reset_offset(self, topic, group_id, name='base', partitions=None):
  49. consumer = self.get_kafka_consumer(group_id=group_id, name=name)
  50. if not partitions:
  51. p = consumer.partitions_for_topic(topic=topic)
  52. partitions = [TopicPartition(topic=topic, partition=i) for i in p]
  53. consumer.assign(partitions)
  54. for i in partitions:
  55. consumer.seek(i, 0)
  56. return consumer
  57. def get_mysql_client(self, db, name='test_firefly'):
  58. host = self.env.get_val('mysql.' + name + '.host')
  59. port = self.env.get_val('mysql.' + name + '.port')
  60. user = self.env.get_val('mysql.' + name + '.user')
  61. pwd = self.env.get_val('mysql.' + name + '.pwd')
  62. return pymysql.connect(host=host, port=int(port), user=user, passwd=pwd, db=db, charset='utf8')
  63. def get_holo_client(self, name='base', db=None):
  64. host = self.env.get_val('holo.' + name + '.host')
  65. port = self.env.get_val('holo.' + name + '.port')
  66. if not db:
  67. db = self.env.get_val('holo.' + name + '.dbname')
  68. user = self.env.get_val('holo.' + name + '.username')
  69. pwd = self.env.get_val('holo.' + name + '.pwd')
  70. return psycopg2.connect(host=host, port=int(port), dbname=db, user=user, password=pwd)
  71. pass
  72. def get_all_client():
  73. return WinhcAllClient()
  74. pass
  75. def get_odps_sdk(project='winhc_ng'):
  76. return ODPS('LTAI4FynxS5nNuKyZ3LHhMX5', 'r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE', project,
  77. endpoint='http://service.odps.aliyun.com/api')
  78. if __name__ == '__main__':
  79. pass