# -*- coding: utf-8 -*- # @Time : 2021/12/29 13:48 # @Author : XuJiakai # @File : MongoDbFastScan # @Software: PyCharm import threading import math from log import get_log log = get_log('MongoDbFastScan') class MongoDbFastScan: def __init__(self, db): self.collection_name = None self.batch_size = 500 self.thread_num = 1 self.data_num = None self.db = db self.func = None def __start_thread(self, skip, limit): db = self.db skip = int(skip) limit = int(limit) li = [] for i in db.get_collection(self.collection_name).find(no_cursor_timeout=True).skip(skip).limit( limit).batch_size(self.batch_size): li.append(i) if len(li) >= self.batch_size: log.info("start:{} end:{} handle ...".format(skip, skip + limit)) self.func(li) li = [] pass if len(li) != 0: log.info("start:{} end:{} handle ...".format(skip, skip + limit)) self.func(li) pass log.info("start:{} end:{} success".format(skip, skip + limit)) pass def scan(self, collection_name, func, thread_num=1, batch_size=500, skip=0): self.collection_name = collection_name self.func = func db = self.db count = db.get_collection(collection_name).count_documents(filter={}) if count == 0: return self.data_num = math.ceil(count / thread_num) * thread_num log.info("data number: {} thread num: {}".format(self.data_num, self.thread_num)) self.thread_num = thread_num self.batch_size = batch_size skip_value = skip % (self.data_num / thread_num) skip_continue = skip / (self.data_num / thread_num) for i in range(thread_num): if i < skip_continue: continue else: ttt = threading.Thread(target=self.__start_thread, args=( i * (self.data_num / thread_num) + skip_value, self.data_num / thread_num - skip_value)) ttt.start() pass pass pass if __name__ == '__main__': m = MongoDbFastScan(None) def func(li): print(len(li)) pass m.scan("a_xjk_recleanup_hk_20211221", func, thread_num=5) pass