# -*- coding: utf-8 -*- # @Time : 2021/5/27 14:31 # @Author : XuJiakai # @File : WinhcAllClient # @Software: PyCharm import psycopg2 import pymysql from elasticsearch import Elasticsearch from kafka import KafkaConsumer, TopicPartition from kafka import KafkaProducer from odps import ODPS from pymongo import MongoClient from env.environment_switch import environment_switch from sdk.WinhcHbaseApi import WinhcHbaseApi from sdk.RabbitMQ import RabbitMQ class WinhcAllClient: def __init__(self): self.env = environment_switch() pass def get_hbase_client(self): host = self.env.get_val('winhc_open_api.eci_data.host') return WinhcHbaseApi(host) def get_es_client(self, es_name='new'): hosts = self.env.get_val('es.' + es_name + '.hosts') username = self.env.get_val('es.' + es_name + '.username') password = self.env.get_val('es.' + es_name + '.pwd') use_ssl = False return Elasticsearch(hosts.split(','), http_auth=(username, password), use_ssl=use_ssl) def get_mongo_db(self, db='itslaw', mongo_name='itslaw'): url = self.env.get_val('mongo.' + mongo_name + '.url') c = MongoClient(url) return c[db] def get_mongo_collection(self, coll_name, db='itslaw', mongo_name='itslaw'): db = self.get_mongo_db(db=db, mongo_name=mongo_name) return db[coll_name] def get_kafka_sender(self, name='base'): hosts = self.env.get_val('kafka.' + name + '.hosts') return KafkaProducer(bootstrap_servers=hosts, api_version=(0, 10), retries=3) def get_kafka_consumer(self, group_id, name='base'): hosts = self.env.get_val('kafka.' + name + '.hosts') return KafkaConsumer(bootstrap_servers=hosts, group_id=group_id, api_version=(0, 10, 2), session_timeout_ms=25000, max_poll_records=100, fetch_max_bytes=1 * 1024 * 1024) def get_kafka_consumer_by_reset_offset(self, topic, group_id, name='base', partitions=None): consumer = self.get_kafka_consumer(group_id=group_id, name=name) if not partitions: p = consumer.partitions_for_topic(topic=topic) partitions = [TopicPartition(topic=topic, partition=i) for i in p] consumer.assign(partitions) for i in partitions: consumer.seek(i, 0) return consumer def get_mysql_client(self, db, name='test_firefly'): host = self.env.get_val('mysql.' + name + '.host') port = self.env.get_val('mysql.' + name + '.port') user = self.env.get_val('mysql.' + name + '.user') pwd = self.env.get_val('mysql.' + name + '.pwd') return pymysql.connect(host=host, port=int(port), user=user, passwd=pwd, db=db, charset='utf8') def get_holo_client(self, name='base', db=None): host = self.env.get_val('holo.' + name + '.host') port = self.env.get_val('holo.' + name + '.port') if not db: db = self.env.get_val('holo.' + name + '.dbname') user = self.env.get_val('holo.' + name + '.username') pwd = self.env.get_val('holo.' + name + '.pwd') return psycopg2.connect(host=host, port=int(port), dbname=db, user=user, password=pwd) pass def get_rabbit_mq_sdk(self, name='base'): host = self.env.get_val('rabbit_mq.' + name + '.host') return RabbitMQ(username='whc', password='whc', host=host, port=5672, virtual_host='/') pass def get_all_client(): return WinhcAllClient() pass def get_odps_sdk(project='winhc_ng'): return ODPS('LTAI4FynxS5nNuKyZ3LHhMX5', 'r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE', project, endpoint='http://service.odps.aliyun.com/api') if __name__ == '__main__': pass