123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # -*- coding: utf-8 -*-
- # @Time : 2021/5/27 14:31
- # @Author : XuJiakai
- # @File : WinhcAllClient
- # @Software: PyCharm
- import psycopg2
- import pymysql
- from elasticsearch import Elasticsearch
- from kafka import KafkaConsumer, TopicPartition
- from kafka import KafkaProducer
- from odps import ODPS
- from pymongo import MongoClient
- from env.environment_switch import environment_switch
- from sdk.WinhcHbaseApi import WinhcHbaseApi
- class WinhcAllClient:
- def __init__(self):
- self.env = environment_switch()
- pass
- def get_hbase_client(self):
- host = self.env.get_val('winhc_open_api.eci_data.host')
- return WinhcHbaseApi(host)
- def get_es_client(self, es_name='new'):
- hosts = self.env.get_val('es.' + es_name + '.hosts')
- username = self.env.get_val('es.' + es_name + '.username')
- password = self.env.get_val('es.' + es_name + '.pwd')
- use_ssl = False
- return Elasticsearch(hosts.split(','), http_auth=(username, password), use_ssl=use_ssl)
- def get_mongo_db(self, db='itslaw', mongo_name='itslaw'):
- url = self.env.get_val('mongo.' + mongo_name + '.url')
- c = MongoClient(url)
- return c[db]
- def get_mongo_collection(self, coll_name, db='itslaw', mongo_name='itslaw'):
- db = self.get_mongo_db(db=db, mongo_name=mongo_name)
- return db[coll_name]
- def get_kafka_sender(self, name='base'):
- hosts = self.env.get_val('kafka.' + name + '.hosts')
- return KafkaProducer(bootstrap_servers=hosts,
- api_version=(0, 10),
- retries=3)
- def get_kafka_consumer(self, group_id, name='base'):
- hosts = self.env.get_val('kafka.' + name + '.hosts')
- return KafkaConsumer(bootstrap_servers=hosts,
- group_id=group_id,
- api_version=(0, 10, 2),
- session_timeout_ms=25000,
- max_poll_records=100,
- fetch_max_bytes=1 * 1024 * 1024)
- def get_kafka_consumer_by_reset_offset(self, topic, group_id, name='base', partitions=None):
- consumer = self.get_kafka_consumer(group_id=group_id, name=name)
- if not partitions:
- p = consumer.partitions_for_topic(topic=topic)
- partitions = [TopicPartition(topic=topic, partition=i) for i in p]
- consumer.assign(partitions)
- for i in partitions:
- consumer.seek(i, 0)
- return consumer
- def get_mysql_client(self, db, name='test_firefly'):
- host = self.env.get_val('mysql.' + name + '.host')
- port = self.env.get_val('mysql.' + name + '.port')
- user = self.env.get_val('mysql.' + name + '.user')
- pwd = self.env.get_val('mysql.' + name + '.pwd')
- return pymysql.connect(host=host, port=int(port), user=user, passwd=pwd, db=db, charset='utf8')
- def get_holo_client(self, name='base', db=None):
- host = self.env.get_val('holo.' + name + '.host')
- port = self.env.get_val('holo.' + name + '.port')
- if not db:
- db = self.env.get_val('holo.' + name + '.dbname')
- user = self.env.get_val('holo.' + name + '.username')
- pwd = self.env.get_val('holo.' + name + '.pwd')
- return psycopg2.connect(host=host, port=int(port), dbname=db, user=user, password=pwd)
- pass
- def get_all_client():
- return WinhcAllClient()
- pass
- def get_odps_sdk(project='winhc_ng'):
- return ODPS('LTAI4FynxS5nNuKyZ3LHhMX5', 'r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE', project,
- endpoint='http://service.odps.aliyun.com/api')
- if __name__ == '__main__':
- pass
|