in python programming IT ~ read.

Python: Producer's queue consumed by workers

I have been solving a simple and a recurring problem. The flow is quite often very same:

  1. Receive data in a stream from external source (reading big file in chunks, streaming HTTP response like jsonlines and so on)
  2. Process the data
  3. Send them somewhere else

When you don't have to worry about speed, you can just do all of this in batch - download the whole stream, process it and send it over.

But what if you wanna do it all at once and ideally with multiprocessing/multithreading? By this approach you can reduce a lot of time, because while e.g. still downloading data, you can be already processing those you have already received.

Python (as usually) comes with everything included. The abstraction of Queues is great for this setting, so you don't have to bother with low level primitives such as semaphores and locks.

Here is my solution, which should be pretty selfexplanatory:

import multiprocessing as mp  
import time

def worker(queue, kafka_queue):  
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        print('Processing %s (MP: %s) ' % (msg, mp.current_process().name))
        time.sleep(0.01)
        kafka_queue.put(msg)
        queue.task_done()

def send_kafka(queue):  
    while True:
        msg = queue.get()
        print('Kafka sending %s' % msg)
        queue.task_done()


def writer(count, queue):  
    ## Write to the queue
    for ii in range(0, count):
        print('Writing %s (MP: %s)' % (ii, mp.current_process().name))
        queue.put(ii)             # Write 'count' numbers into the queue
        time.sleep(0.001)

if __name__=='__main__':  
    count = 30

    # initialize queues
    queue = mp.JoinableQueue()   # this is where we are going to store input data
    kafka_queue = mp.JoinableQueue()  # this where we are gonna push them out

    # create 4 processes, which takes queues as arguments
    # the data will get in and out through the queues
    # daemonize it
    processes = []
    for i in range(4):
        worker_process = mp.Process(target=worker, args=(queue, kafka_queue), daemon=True, name='worker_process_{}'.format(i))
        worker_process.start()        # Launch reader() as a separate python process
        processes.append(worker_process)

    print([x.name for x in processes])

    # this process simulates sending processed data out 
    kafka_process = mp.Process(target=send_kafka, args=(kafka_queue,), daemon=True, name='kafka_process')
    kafka_process.start()


    _start = time.time()
    writer(count, queue)    # Send a lot of stuff for workers tasks
    # wait till everything is processed
    queue.join()
    kafka_queue.join()
    print( "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)))

and the output is:

['worker_process_0', 'worker_process_1', 'worker_process_2', 'worker_process_3']
Writing 0 (MP: MainProcess)  
Processing 0 (MP: worker_process_0)  
Writing 1 (MP: MainProcess)  
Processing 1 (MP: worker_process_1)  
Writing 2 (MP: MainProcess)  
Processing 2 (MP: worker_process_2)  
Writing 3 (MP: MainProcess)  
Processing 3 (MP: worker_process_3)  
Writing 4 (MP: MainProcess)  
Writing 5 (MP: MainProcess)  
Writing 6 (MP: MainProcess)  
Writing 7 (MP: MainProcess)  
Writing 8 (MP: MainProcess)  
Writing 9 (MP: MainProcess)  
Writing 10 (MP: MainProcess)  
Processing 4 (MP: worker_process_0)  
Kafka sending 0  
Writing 11 (MP: MainProcess)  
Kafka sending 1  
Processing 5 (MP: worker_process_1)  
Processing 6 (MP: worker_process_2)  
Kafka sending 2  
Writing 12 (MP: MainProcess)  
Writing 13 (MP: MainProcess)  
Writing 14 (MP: MainProcess)  
Kafka sending 3  
Processing 7 (MP: worker_process_3)  
Writing 15 (MP: MainProcess)  
Writing 16 (MP: MainProcess)  
Writing 17 (MP: MainProcess)  
Writing 18 (MP: MainProcess)  
Processing 8 (MP: worker_process_0)  
Kafka sending 4  
Writing 19 (MP: MainProcess)  
Processing 9 (MP: worker_process_1)  
Kafka sending 5  
Kafka sending 6  
Processing 10 (MP: worker_process_2)  
Writing 20 (MP: MainProcess)  
Writing 21 (MP: MainProcess)  
Writing 22 (MP: MainProcess)  
Processing 11 (MP: worker_process_3)  
Writing 23 (MP: MainProcess)  
Kafka sending 7  
Writing 24 (MP: MainProcess)  
Writing 25 (MP: MainProcess)  
Writing 26 (MP: MainProcess)  
Writing 27 (MP: MainProcess)  
Kafka sending 8  
Processing 12 (MP: worker_process_0)  
Writing 28 (MP: MainProcess)  
Processing 13 (MP: worker_process_1)  
Kafka sending 9  
Processing 14 (MP: worker_process_2)  
Kafka sending 10  
Writing 29 (MP: MainProcess)  
Processing 15 (MP: worker_process_3)  
Kafka sending 11  
Processing 16 (MP: worker_process_0)  
Kafka sending 12  
Kafka sending 13  
Processing 17 (MP: worker_process_1)  
Processing 18 (MP: worker_process_2)  
Kafka sending 14  
Kafka sending 15  
Processing 19 (MP: worker_process_3)  
Kafka sending 16  
Processing 20 (MP: worker_process_0)  
Processing 21 (MP: worker_process_1)  
Kafka sending 17  
Processing 22 (MP: worker_process_2)  
Kafka sending 18  
Kafka sending 19  
Processing 23 (MP: worker_process_3)  
Kafka sending 20  
Processing 24 (MP: worker_process_0)  
Processing 25 (MP: worker_process_1)  
Kafka sending 21  
Kafka sending 22  
Processing 26 (MP: worker_process_2)  
Kafka sending 23  
Processing 27 (MP: worker_process_3)  
Processing 28 (MP: worker_process_0)  
Kafka sending 24  
Kafka sending 25  
Processing 29 (MP: worker_process_1)  
Kafka sending 26  
Kafka sending 27  
Kafka sending 28  
Kafka sending 29  
Sending 30 numbers to Queue() took 0.0899205207824707 seconds