|
@@ -0,0 +1,103 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# @Time : 2021/5/27 14:31
|
|
|
+# @Author : XuJiakai
|
|
|
+# @File : WinhcAllClient
|
|
|
+# @Software: PyCharm
|
|
|
+import psycopg2
|
|
|
+import pymysql
|
|
|
+from elasticsearch import Elasticsearch
|
|
|
+from kafka import KafkaConsumer, TopicPartition
|
|
|
+from kafka import KafkaProducer
|
|
|
+from odps import ODPS
|
|
|
+from pymongo import MongoClient
|
|
|
+
|
|
|
+from env.environment_switch import environment_switch
|
|
|
+from sdk.WinhcHbaseApi import WinhcHbaseApi
|
|
|
+
|
|
|
+
|
|
|
+class WinhcAllClient:
|
|
|
+ def __init__(self):
|
|
|
+ self.env = environment_switch()
|
|
|
+ pass
|
|
|
+
|
|
|
+ def get_hbase_client(self):
|
|
|
+ host = self.env.get_val('winhc_open_api.eci_data.host')
|
|
|
+ return WinhcHbaseApi(host)
|
|
|
+
|
|
|
+ def get_es_client(self, es_name='new'):
|
|
|
+ hosts = self.env.get_val('es.' + es_name + '.hosts')
|
|
|
+ username = self.env.get_val('es.' + es_name + '.username')
|
|
|
+ password = self.env.get_val('es.' + es_name + '.pwd')
|
|
|
+ use_ssl = False
|
|
|
+ return Elasticsearch(hosts.split(','), http_auth=(username, password), use_ssl=use_ssl)
|
|
|
+
|
|
|
+ def get_mongo_db(self, db='itslaw', mongo_name='itslaw'):
|
|
|
+ url = self.env.get_val('mongo.' + mongo_name + '.url')
|
|
|
+ c = MongoClient(url)
|
|
|
+ return c[db]
|
|
|
+
|
|
|
+ def get_mongo_collection(self, coll_name, db='itslaw', mongo_name='itslaw'):
|
|
|
+ db = self.get_mongo_db(db=db, mongo_name=mongo_name)
|
|
|
+ return db[coll_name]
|
|
|
+
|
|
|
+ def get_kafka_sender(self, name='base'):
|
|
|
+ hosts = self.env.get_val('kafka.' + name + '.hosts')
|
|
|
+ return KafkaProducer(bootstrap_servers=hosts,
|
|
|
+ api_version=(0, 10),
|
|
|
+ retries=3)
|
|
|
+
|
|
|
+ def get_kafka_consumer(self, group_id, name='base'):
|
|
|
+ hosts = self.env.get_val('kafka.' + name + '.hosts')
|
|
|
+ return KafkaConsumer(bootstrap_servers=hosts,
|
|
|
+ group_id=group_id,
|
|
|
+ api_version=(0, 10, 2),
|
|
|
+ session_timeout_ms=25000,
|
|
|
+ max_poll_records=100,
|
|
|
+ fetch_max_bytes=1 * 1024 * 1024)
|
|
|
+
|
|
|
+ def get_kafka_consumer_by_reset_offset(self, topic, group_id, name='base', partitions=None):
|
|
|
+ consumer = self.get_kafka_consumer(group_id=group_id, name=name)
|
|
|
+
|
|
|
+ if not partitions:
|
|
|
+ p = consumer.partitions_for_topic(topic=topic)
|
|
|
+ partitions = [TopicPartition(topic=topic, partition=i) for i in p]
|
|
|
+
|
|
|
+ consumer.assign(partitions)
|
|
|
+ for i in partitions:
|
|
|
+ consumer.seek(i, 0)
|
|
|
+ return consumer
|
|
|
+
|
|
|
+ def get_mysql_client(self, db, name='test_firefly'):
|
|
|
+ host = self.env.get_val('mysql.' + name + '.host')
|
|
|
+ port = self.env.get_val('mysql.' + name + '.port')
|
|
|
+ user = self.env.get_val('mysql.' + name + '.user')
|
|
|
+ pwd = self.env.get_val('mysql.' + name + '.pwd')
|
|
|
+ return pymysql.connect(host=host, port=int(port), user=user, passwd=pwd, db=db, charset='utf8')
|
|
|
+
|
|
|
+ def get_holo_client(self, name='base', db=None):
|
|
|
+ host = self.env.get_val('holo.' + name + '.host')
|
|
|
+ port = self.env.get_val('holo.' + name + '.port')
|
|
|
+ if not db:
|
|
|
+ db = self.env.get_val('holo.' + name + '.dbname')
|
|
|
+ user = self.env.get_val('holo.' + name + '.username')
|
|
|
+ pwd = self.env.get_val('holo.' + name + '.pwd')
|
|
|
+ return psycopg2.connect(host=host, port=int(port), dbname=db, user=user, password=pwd)
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def get_all_client():
|
|
|
+ return WinhcAllClient()
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+def get_odps_sdk(project='winhc_ng'):
|
|
|
+ return ODPS('LTAI4FynxS5nNuKyZ3LHhMX5', 'r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE', project,
|
|
|
+ endpoint='http://service.odps.aliyun.com/api')
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ pass
|