TestMain.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:50
  3. # @Author : XuJiakai
  4. # @File : TestMain
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from data_clean.api.http_api import get_json
  9. from data_clean.utils.async_client import get_aio_kafka_producer
  10. async def get_test_data():
  11. tn = "company_court_open_announcement"
  12. url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/8fd218fc8461789c4c401eb1eaa3d723"
  13. res = await get_json(url)
  14. res = json.loads(json.dumps(res['data']).lower())
  15. res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
  16. data = {
  17. "data": {
  18. tn: [
  19. res
  20. ]
  21. }
  22. }
  23. return data
  24. async def test_send_kafka():
  25. producer = get_aio_kafka_producer()
  26. await producer.start()
  27. data = await get_test_data()
  28. res = await producer.send_and_wait("source_topic", json.dumps(data).encode())
  29. print(res)
  30. await producer.stop()
  31. pass
  32. async def test_for_url():
  33. data = await get_test_data()
  34. print("receive : ", data)
  35. from data_clean.task_distributor import task_distribute
  36. result_data = await task_distribute(data)
  37. print("send : ", result_data)
  38. pass
  39. if __name__ == '__main__':
  40. # asyncio.run(test_send_kafka())
  41. asyncio.run(test_for_url())
  42. pass