1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- # -*- coding: utf-8 -*-
- # @Time : 2021/12/29 13:48
- # @Author : XuJiakai
- # @File : MongoDbFastScan
- # @Software: PyCharm
- import threading
- import math
- from log import get_log
- log = get_log('MongoDbFastScan')
- class MongoDbFastScan:
- def __init__(self, db):
- self.collection_name = None
- self.batch_size = 500
- self.thread_num = 1
- self.data_num = None
- self.db = db
- self.func = None
- def __start_thread(self, skip, limit):
- db = self.db
- skip = int(skip)
- limit = int(limit)
- li = []
- for i in db.get_collection(self.collection_name).find(no_cursor_timeout=True).skip(skip).limit(
- limit).batch_size(self.batch_size):
- li.append(i)
- if len(li) >= self.batch_size:
- log.info("start:{} end:{} handle ...".format(skip, skip + limit))
- self.func(li)
- li = []
- pass
- if len(li) != 0:
- log.info("start:{} end:{} handle ...".format(skip, skip + limit))
- self.func(li)
- pass
- log.info("start:{} end:{} success".format(skip, skip + limit))
- pass
- def scan(self, collection_name, func, thread_num=1, batch_size=500, skip=0):
- self.collection_name = collection_name
- self.func = func
- db = self.db
- count = db.get_collection(collection_name).count_documents(filter={})
- if count == 0:
- return
- self.data_num = math.ceil(count / thread_num) * thread_num
- log.info("data number: {} thread num: {}".format(self.data_num, self.thread_num))
- self.thread_num = thread_num
- self.batch_size = batch_size
- skip_value = skip % (self.data_num / thread_num)
- skip_continue = skip / (self.data_num / thread_num)
- for i in range(thread_num):
- if i < skip_continue:
- continue
- else:
- ttt = threading.Thread(target=self.__start_thread, args=(
- i * (self.data_num / thread_num) + skip_value, self.data_num / thread_num - skip_value))
- ttt.start()
- pass
- pass
- pass
- if __name__ == '__main__':
- m = MongoDbFastScan(None)
- def func(li):
- print(len(li))
- pass
- m.scan("a_xjk_recleanup_hk_20211221", func, thread_num=5)
- pass
|