WinhcAllClient.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2021/5/27 14:31
  3. # @Author : XuJiakai
  4. # @File : WinhcAllClient
  5. # @Software: PyCharm
  6. import psycopg2
  7. import pymysql
  8. from elasticsearch import Elasticsearch
  9. from kafka import KafkaConsumer, TopicPartition
  10. from kafka import KafkaProducer
  11. from odps import ODPS
  12. from pymongo import MongoClient
  13. from env.environment_switch import environment_switch
  14. from sdk.WinhcHbaseApi import WinhcHbaseApi
  15. from sdk.RabbitMQ import RabbitMQ
  16. class WinhcAllClient:
  17. def __init__(self):
  18. self.env = environment_switch()
  19. pass
  20. def get_hbase_client(self):
  21. host = self.env.get_val('winhc_open_api.eci_data.host')
  22. return WinhcHbaseApi(host)
  23. def get_es_client(self, es_name='new'):
  24. hosts = self.env.get_val('es.' + es_name + '.hosts')
  25. username = self.env.get_val('es.' + es_name + '.username')
  26. password = self.env.get_val('es.' + es_name + '.pwd')
  27. use_ssl = False
  28. return Elasticsearch(hosts.split(','), http_auth=(username, password), use_ssl=use_ssl)
  29. def get_mongo_db(self, db='itslaw', mongo_name='itslaw'):
  30. url = self.env.get_val('mongo.' + mongo_name + '.url')
  31. c = MongoClient(url)
  32. return c[db]
  33. def get_mongo_collection(self, coll_name, db='itslaw', mongo_name='itslaw'):
  34. db = self.get_mongo_db(db=db, mongo_name=mongo_name)
  35. return db[coll_name]
  36. def get_kafka_sender(self, name='base'):
  37. hosts = self.env.get_val('kafka.' + name + '.hosts')
  38. return KafkaProducer(bootstrap_servers=hosts,
  39. api_version=(0, 10),
  40. retries=3)
  41. def get_kafka_consumer(self, group_id, name='base'):
  42. hosts = self.env.get_val('kafka.' + name + '.hosts')
  43. return KafkaConsumer(bootstrap_servers=hosts,
  44. group_id=group_id,
  45. api_version=(0, 10, 2),
  46. session_timeout_ms=25000,
  47. max_poll_records=100,
  48. fetch_max_bytes=1 * 1024 * 1024)
  49. def get_kafka_consumer_by_reset_offset(self, topic, group_id, name='base', partitions=None):
  50. consumer = self.get_kafka_consumer(group_id=group_id, name=name)
  51. if not partitions:
  52. p = consumer.partitions_for_topic(topic=topic)
  53. partitions = [TopicPartition(topic=topic, partition=i) for i in p]
  54. consumer.assign(partitions)
  55. for i in partitions:
  56. consumer.seek(i, 0)
  57. return consumer
  58. def get_mysql_client(self, db, name='test_firefly'):
  59. host = self.env.get_val('mysql.' + name + '.host')
  60. port = self.env.get_val('mysql.' + name + '.port')
  61. user = self.env.get_val('mysql.' + name + '.user')
  62. pwd = self.env.get_val('mysql.' + name + '.pwd')
  63. return pymysql.connect(host=host, port=int(port), user=user, passwd=pwd, db=db, charset='utf8')
  64. def get_holo_client(self, name='base', db=None):
  65. host = self.env.get_val('holo.' + name + '.host')
  66. port = self.env.get_val('holo.' + name + '.port')
  67. if not db:
  68. db = self.env.get_val('holo.' + name + '.dbname')
  69. user = self.env.get_val('holo.' + name + '.username')
  70. pwd = self.env.get_val('holo.' + name + '.pwd')
  71. return psycopg2.connect(host=host, port=int(port), dbname=db, user=user, password=pwd)
  72. pass
  73. def get_rabbit_mq_sdk(self, name='base'):
  74. host = self.env.get_val('rabbit_mq.' + name + '.host')
  75. return RabbitMQ(username='whc', password='whc', host=host, port=5672, virtual_host='/')
  76. pass
  77. def get_all_client():
  78. return WinhcAllClient()
  79. pass
  80. def get_odps_sdk(project='winhc_ng'):
  81. return ODPS('LTAI4FynxS5nNuKyZ3LHhMX5', 'r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE', project,
  82. endpoint='http://service.odps.aliyun.com/api')
  83. if __name__ == '__main__':
  84. pass