from multiprocess import Process import os import math
defwork_fun(work_list): pass
defdistrib_works(work_list, process_num): group_length = math.ceil(len(filename_list) / process_num) return [work_list[(i*group_length):((i+1)*group_length)] for i in range(process_num)]
work_list = os.listdir("../data") process_num = 4 group = distrib_works(work_list, process_num)
process_list = [Process(target=work_fun, args=(g,)) for g in group_list] for p in process_list: p.daemon = True p.start() for p in process_list: p.join()
将进程定义为类
利用 Python 面向对象的特性,我们可以创建一个类,继承 Process 类, 将一些数据直接在构造的时候保存下来,可以无需在调用的时候传入。 例如,当我们在多进程程序中使用 tqdm 库显示进度条时,会用到其 position 参数来指定当前进度条在控制台中显示的位置,这个参数的值,我们可以直接保存在进程类中, 无需调用的时候再传入。
将进程定义为类的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
from multiprocess import Process
classMyProcess(Process): position = 0 works = None
def__init__(self, position, works) Process.__init__(self) self.position = poisition self.works = works
defrun(): pass
如果在构造函数中,调用的 Process 的构造函数没有指定 target, 进程同样默认执行 不带参数的 run 函数,即使你的 run 函数定义了形参!
在创建进程时,只需要将原来调用的 Process 的构造函数,改为调用 MyProcess 的构造函数即可。 这种创建进程方式的实例如下:
defdistrib_works(work_list: List[str], process_num) -> List[List[str]]: group_length = math.ceil(len(work_list) / process_num) return [g for g in [work_list[(i*group_length):((i+1)*group_length)] for i in range(process_num)] if len(g) > 0]
deffind_target(self, lock, log_file): for filename in self.input_files: with open(filename) as in_file: for row in tqdm(in_file, ncols=80, position=self.index): cells = row.split(",") if int(cells[0]) == 11865: print(row)
if __name__ == '__main__': LOG_FILE = r"E:\出租车点\上下车点\scripts\data\find_error.log" lock = multiprocessing.Lock() # ROOT_DIR = "../data/201502/temp" ROOT_DIR = r"E:\出租车点\201502\RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 4) PROCESS_LIST = [FineTargetTaxiProcess(element, i, lock) for i, element in enumerate(GROUP_LIST)] for process in PROCESS_LIST: process.start()
deffind_target(in_file, lock, log_file): with open(filename) as in_file: for row in tqdm(in_file, ncols=80, position=self.index): cells = row.split(",") if int(cells[0]) == 11865: with lock: with open(log_file, mode="a") as log: print(row, file=log)
if __name__ == '__main__': LOCK = multiprocessing.Lock() LOG_FILE = r"E:\出租车点\上下车点\scripts\data\find_error.log" ROOT_DIR = r"E:\出租车点\201502\RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] POOL = Pool(process=6) for f in INPUT_FILES: POOL.apply_async(find_target, (f, LOCK, LOG_FILE)) POOL.close() POOL.join()
对于冲突的情况,当使用 tqdm 显示多个进度条时比较明显。在 Windows 上,由于 “tqdm 无法获取默认锁”,因此控制台输出会比较乱,下面是一段程序在 Windows 上运行的效果:
1 2 3 4 5 6 7 8 9 10 11 12
λ python3 find_errors.py Process 0: 0it [00:00, ?it/s] Process 1: 0it [00:00, ?it/s] Process 0: 273516it [00:00, 523719.79it/s] Process 0: 995883it [00:01, 510379.67it/s] Process 0: 1107387it [00:02, 510326.10it/s] Process 0: 1224813it [00:02, 512761.81it/s] Process 0: 3483799it [00:06, 539191.83it/s] Process 1: 3683852it [00:06, 571536.15it/s] Process 0: 3550015it [00:06, 540296.03it/s] Process 0: 3615558it [00:06, 540947.45it/s] Process 0: 3742521it [00:06, 542112.37it/s]
而在 Linux 系统中的运行结果是
1 2 3 4
Process 0: 2045720it [00:03, 647073.52it/s] Process 1: 2092184it [00:03, 661530.01it/s] Process 2: 2065411it [00:03, 652446.31it/s] Process 3: 2093610it [00:03, 661782.04it/s]
使用 Lock 时,可以使用 with 语句加锁, with 语句块执行完成后自动解锁; 也可以通过其 acquire() 函数来加锁,使用 release() 函数解锁。
使用 with 语句进行加锁的示例代码如下:
1 2 3 4 5 6 7 8 9
defrun(self): for filename in self.input_files: with open(filename, encoding="GB2312") as in_file: for row in tqdm(in_file): cells = row.split(",") if int(cells[0]) == 11865: with self.lock: with open(TARGET_TAXI_FILE, mode="a") as log: print(row, file=log)
这段代码在 Windows 上运行时,子进程内部的 lock 和 主进程传递进去的 lock 的 id 值不相同。 但是在 Linux 系统上时相同的。因此 Windows 上这段代码有可能会出错。 不过当文件被一个进程打开时,是无法被另一个进程打开的,因此这段程序的结果倒没出什么错。
把之前的共享变量的代码中,共享的变量由 list 改为 Manager 对象创建的 list,可以得到正确结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from multiprocessing import Process, Lock, Manager import time
defwork(lock, var, index): with lock: var.append(index) print(f"Process {index} apped {index}")
if __name__ == '__main__': var = Manager().list() lock = Lock() process_list = [Process(target=work, args=(lock, var, i)) for i in range(8)] for p in process_list: p.start() for p in process_list: p.join() print(var)
defpick(self, queue): for filename in tqdm(self.input_files, ncols=80, position=self.index, desc=f"Process {self.index}"): with open(filename, encoding="GB2312") as in_file: for row in in_file: cells = row.split(",") if int(cells[0]) == 11865: try: queue.put(",".join(cells), block=False) except: print("Queue full")
defwrite(self, queue): with open(self.output_file, mode="w", newline="\n") as printer, open(self.log_file, mode="w") as log: whileTrue: try: row = queue.get(block=True, timeout=1) print(row, file=printer) except: print("Queue empty", file=log)
if __name__ == '__main__': lock = mp.Lock() ROOT_DIR = r"/mnt/e/出租车点/201502/RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 6) QUEUE = mp.Queue() PROCESS_LIST = [FineTargetTaxiProcess(element, i, QUEUE) for i, element in enumerate(GROUP_LIST)] PRINTER_PROCESS = PrinterProcess("./data/usequeue.txt", "./data/usequeue.log", QUEUE) for process in PROCESS_LIST: process.daemon = True process.start() PRINTER_PROCESS.daemon = True PRINTER_PROCESS.start() for p in PROCESS_LIST: p.join()
defpick(self, pipe): for filename in tqdm(self.input_files, ncols=80, position=self.index, desc=f"Process {self.index}"): with open(filename, encoding="GB2312") as in_file: for row in in_file: cells = row.split(",") if int(cells[0]) == 11865: try: pipe.send(",".join(cells)) except e as Exception: print("Pipe send error")
defwrite(self, pipe): with open(self.output_file, mode="w", newline="\n") as printer, open(self.log_file, mode="w") as log: whileTrue: try: row = pipe.recv() print(row, file=printer) except e as Exception: print("Pipe read error", file=log)
if __name__ == '__main__': lock = mp.Lock() # ROOT_DIR = "../data/201502/temp" ROOT_DIR = r"/mnt/e/出租车点/201502/RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 6) (RECEIVER, SENDER) = mp.Pipe() PROCESS_LIST = [FineTargetTaxiProcess(element, i, SENDER) for i, element in enumerate(GROUP_LIST)] PRINTER_PROCESS = PrinterProcess("./data/usepipe.txt", "./data/usepipe.log", RECEIVER) for process in PROCESS_LIST: process.daemon = True process.start() PRINTER_PROCESS.daemon = True PRINTER_PROCESS.start() for p in PROCESS_LIST: p.join()