multiprocessing_producer_consumer.py

import os import time import math import multiprocessing # 从大数据获取数据写入Queue def get_push_list_from_bigdata(args, pushQueue, statusValue): statusValue.value = 1 i = 0 while i < 20: pushQueue.put([ "records", "task_id", "push_id", "context", "channel" ]) i = i + 1 statusValue.value = 2 print("生产者 over pid: " + str(os.getpid()) + " status:" + " " + str(statusValue)) # 推送进程 def pushDog(pushQueue, statusValue): pid = os.getpid() # print("pushDog pid: " + str(pid) + " psize:" + str(pushQueue.qsize()) + " status:" + str(statusValue.value)) time.sleep(0.1) while True: if pushQueue.empty(): # 数据队列已经空 if 2 == statusValue.value: # 生产者已经生成数据结束 print("pushDog over pid: " + str(pid)) break try: qitem = pushQueue.get(False) pushQueue.task_done() print("消费者 pushDog pid: " + str(pid) + str(qitem)) except: continue # 从大数据获取推送数据源 def push_from_bigdata(args): print("主进程id: " + str(os.getpid())) PROCESSES = math.floor(multiprocessing.cpu_count() * 0.4) pushQueue = multiprocessing.JoinableQueue() # 生产数据是否结束标志 statusValue = multiprocessing.Value('i', 0) # 0 未生产数据 1 生产数据中 2 生产数据结束 process_list = [] # 生产者们 pushProducer = multiprocessing.Process(target=get_push_list_from_bigdata, kwargs={ "pushQueue": pushQueue, "args": args, "statusValue": statusValue }) process_list.append(pushProducer) pushProducer.start() # 消费者们 for n in range(PROCESSES): p = multiprocessing.Process(target=pushDog, kwargs={ "pushQueue": pushQueue, "statusValue": statusValue }) # 消费者们 #p.daemon = True p.start() process_list.append(p) #time.sleep(2) # 等待生成者完成 # pushProducer.join() # 等待消费者完成 print(process_list) for p in process_list: p.join() print("pid joined: " + str(p.pid)) print("============================over") if __name__ == "__main__": multiprocessing.freeze_support() push_from_bigdata({"a":1})
python 多进程生产者消费者 demo

Be the first to comment

You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.