MongoDbFastScan.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2021/12/29 13:48
  3. # @Author : XuJiakai
  4. # @File : MongoDbFastScan
  5. # @Software: PyCharm
  6. import threading
  7. import math
  8. from log import get_log
  9. log = get_log('MongoDbFastScan')
  10. class MongoDbFastScan:
  11. def __init__(self, db):
  12. self.collection_name = None
  13. self.batch_size = 500
  14. self.thread_num = 1
  15. self.data_num = None
  16. self.db = db
  17. self.func = None
  18. def __start_thread(self, skip, limit):
  19. db = self.db
  20. skip = int(skip)
  21. limit = int(limit)
  22. li = []
  23. for i in db.get_collection(self.collection_name).find(no_cursor_timeout=True).skip(skip).limit(
  24. limit).batch_size(self.batch_size):
  25. li.append(i)
  26. if len(li) >= self.batch_size:
  27. log.info("start:{} end:{} handle ...".format(skip, skip + limit))
  28. self.func(li)
  29. li = []
  30. pass
  31. if len(li) != 0:
  32. log.info("start:{} end:{} handle ...".format(skip, skip + limit))
  33. self.func(li)
  34. pass
  35. log.info("start:{} end:{} success".format(skip, skip + limit))
  36. pass
  37. def scan(self, collection_name, func, thread_num=1, batch_size=500, skip=0):
  38. self.collection_name = collection_name
  39. self.func = func
  40. db = self.db
  41. count = db.get_collection(collection_name).count_documents(filter={})
  42. if count == 0:
  43. return
  44. self.data_num = math.ceil(count / thread_num) * thread_num
  45. log.info("data number: {} thread num: {}".format(self.data_num, self.thread_num))
  46. self.thread_num = thread_num
  47. self.batch_size = batch_size
  48. skip_value = skip % (self.data_num / thread_num)
  49. skip_continue = skip / (self.data_num / thread_num)
  50. for i in range(thread_num):
  51. if i < skip_continue:
  52. continue
  53. else:
  54. ttt = threading.Thread(target=self.__start_thread, args=(
  55. i * (self.data_num / thread_num) + skip_value, self.data_num / thread_num - skip_value))
  56. ttt.start()
  57. pass
  58. pass
  59. pass
  60. if __name__ == '__main__':
  61. m = MongoDbFastScan(None)
  62. def func(li):
  63. print(len(li))
  64. pass
  65. m.scan("a_xjk_recleanup_hk_20211221", func, thread_num=5)
  66. pass