import os
import time
import math
import multiprocessing
# 从大数据获取数据写入Queue
def get_push_list_from_bigdata(args, pushQueue, statusValue):
statusValue.value = 1
i = 0
while i < 20:
pushQueue.put([ "records", "task_id", "push_id", "context", "channel" ])
i = i + 1
statusValue.value = 2
print("生产者 over pid: " + str(os.getpid()) + " status:" + " " + str(statusValue))
# 推送进程
def pushDog(pushQueue, statusValue):
pid = os.getpid()
# print("pushDog pid: " + str(pid) + " psize:" + str(pushQueue.qsize()) + " status:" + str(statusValue.value))
time.sleep(0.1)
while True:
if pushQueue.empty(): # 数据队列已经空
if 2 == statusValue.value: # 生产者已经生成数据结束
print("pushDog over pid: " + str(pid))
break
try:
qitem = pushQueue.get(False)
pushQueue.task_done()
print("消费者 pushDog pid: " + str(pid) + str(qitem))
except:
continue
# 从大数据获取推送数据源
def push_from_bigdata(args):
print("主进程id: " + str(os.getpid()))
PROCESSES = math.floor(multiprocessing.cpu_count() * 0.4)
pushQueue = multiprocessing.JoinableQueue()
# 生产数据是否结束标志
statusValue = multiprocessing.Value('i', 0) # 0 未生产数据 1 生产数据中 2 生产数据结束
process_list = []
# 生产者们
pushProducer = multiprocessing.Process(target=get_push_list_from_bigdata, kwargs={ "pushQueue": pushQueue, "args": args, "statusValue": statusValue })
process_list.append(pushProducer)
pushProducer.start()
# 消费者们
for n in range(PROCESSES):
p = multiprocessing.Process(target=pushDog, kwargs={ "pushQueue": pushQueue, "statusValue": statusValue }) # 消费者们
#p.daemon = True
p.start()
process_list.append(p)
#time.sleep(2)
# 等待生成者完成
# pushProducer.join()
# 等待消费者完成
print(process_list)
for p in process_list:
p.join()
print("pid joined: " + str(p.pid))
print("============================over")
if __name__ == "__main__":
multiprocessing.freeze_support()
push_from_bigdata({"a":1})
python 多进程生产者消费者 demo
Be the first to comment
You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.