Python: Producer's queue consumed by workers
I have been solving a simple and a recurring problem. The flow is quite often very same:
- Receive data in a stream from external source (reading big file in chunks, streaming HTTP response like jsonlines and so on)
- Process the data
- Send them somewhere else
When you don't have to worry about speed, you can just do all of this in batch - download the whole stream, process it and send it over.
But what if you wanna do it all at once and ideally with multiprocessing/multithreading? By this approach you can reduce a lot of time, because while e.g. still downloading data, you can be already processing those you have already received.
Python (as usually) comes with everything included. The abstraction of Queues is great for this setting, so you don't have to bother with low level primitives such as semaphores and locks.
Here is my solution, which should be pretty selfexplanatory:
import multiprocessing as mp
import time
def worker(queue, kafka_queue):
while True:
msg = queue.get() # Read from the queue and do nothing
print('Processing %s (MP: %s) ' % (msg, mp.current_process().name))
time.sleep(0.01)
kafka_queue.put(msg)
queue.task_done()
def send_kafka(queue):
while True:
msg = queue.get()
print('Kafka sending %s' % msg)
queue.task_done()
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
print('Writing %s (MP: %s)' % (ii, mp.current_process().name))
queue.put(ii) # Write 'count' numbers into the queue
time.sleep(0.001)
if __name__=='__main__':
count = 30
# initialize queues
queue = mp.JoinableQueue() # this is where we are going to store input data
kafka_queue = mp.JoinableQueue() # this where we are gonna push them out
# create 4 processes, which takes queues as arguments
# the data will get in and out through the queues
# daemonize it
processes = []
for i in range(4):
worker_process = mp.Process(target=worker, args=(queue, kafka_queue), daemon=True, name='worker_process_{}'.format(i))
worker_process.start() # Launch reader() as a separate python process
processes.append(worker_process)
print([x.name for x in processes])
# this process simulates sending processed data out
kafka_process = mp.Process(target=send_kafka, args=(kafka_queue,), daemon=True, name='kafka_process')
kafka_process.start()
_start = time.time()
writer(count, queue) # Send a lot of stuff for workers tasks
# wait till everything is processed
queue.join()
kafka_queue.join()
print( "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)))
and the output is:
['worker_process_0', 'worker_process_1', 'worker_process_2', 'worker_process_3']
Writing 0 (MP: MainProcess)
Processing 0 (MP: worker_process_0)
Writing 1 (MP: MainProcess)
Processing 1 (MP: worker_process_1)
Writing 2 (MP: MainProcess)
Processing 2 (MP: worker_process_2)
Writing 3 (MP: MainProcess)
Processing 3 (MP: worker_process_3)
Writing 4 (MP: MainProcess)
Writing 5 (MP: MainProcess)
Writing 6 (MP: MainProcess)
Writing 7 (MP: MainProcess)
Writing 8 (MP: MainProcess)
Writing 9 (MP: MainProcess)
Writing 10 (MP: MainProcess)
Processing 4 (MP: worker_process_0)
Kafka sending 0
Writing 11 (MP: MainProcess)
Kafka sending 1
Processing 5 (MP: worker_process_1)
Processing 6 (MP: worker_process_2)
Kafka sending 2
Writing 12 (MP: MainProcess)
Writing 13 (MP: MainProcess)
Writing 14 (MP: MainProcess)
Kafka sending 3
Processing 7 (MP: worker_process_3)
Writing 15 (MP: MainProcess)
Writing 16 (MP: MainProcess)
Writing 17 (MP: MainProcess)
Writing 18 (MP: MainProcess)
Processing 8 (MP: worker_process_0)
Kafka sending 4
Writing 19 (MP: MainProcess)
Processing 9 (MP: worker_process_1)
Kafka sending 5
Kafka sending 6
Processing 10 (MP: worker_process_2)
Writing 20 (MP: MainProcess)
Writing 21 (MP: MainProcess)
Writing 22 (MP: MainProcess)
Processing 11 (MP: worker_process_3)
Writing 23 (MP: MainProcess)
Kafka sending 7
Writing 24 (MP: MainProcess)
Writing 25 (MP: MainProcess)
Writing 26 (MP: MainProcess)
Writing 27 (MP: MainProcess)
Kafka sending 8
Processing 12 (MP: worker_process_0)
Writing 28 (MP: MainProcess)
Processing 13 (MP: worker_process_1)
Kafka sending 9
Processing 14 (MP: worker_process_2)
Kafka sending 10
Writing 29 (MP: MainProcess)
Processing 15 (MP: worker_process_3)
Kafka sending 11
Processing 16 (MP: worker_process_0)
Kafka sending 12
Kafka sending 13
Processing 17 (MP: worker_process_1)
Processing 18 (MP: worker_process_2)
Kafka sending 14
Kafka sending 15
Processing 19 (MP: worker_process_3)
Kafka sending 16
Processing 20 (MP: worker_process_0)
Processing 21 (MP: worker_process_1)
Kafka sending 17
Processing 22 (MP: worker_process_2)
Kafka sending 18
Kafka sending 19
Processing 23 (MP: worker_process_3)
Kafka sending 20
Processing 24 (MP: worker_process_0)
Processing 25 (MP: worker_process_1)
Kafka sending 21
Kafka sending 22
Processing 26 (MP: worker_process_2)
Kafka sending 23
Processing 27 (MP: worker_process_3)
Processing 28 (MP: worker_process_0)
Kafka sending 24
Kafka sending 25
Processing 29 (MP: worker_process_1)
Kafka sending 26
Kafka sending 27
Kafka sending 28
Kafka sending 29
Sending 30 numbers to Queue() took 0.0899205207824707 seconds