I have been solving a simple and a recurring problem. The flow is quite often very same:

  1. Receive data in a stream from external source (reading big file in chunks, streaming HTTP response like jsonlines and so on)
  2. Process the data
  3. Send them somewhere else

When you don't have to worry about speed, you can just do all of this in batch - download the whole stream, process it and send it over.

But what if you wanna do it all at once and ideally with multiprocessing/multithreading? By this approach you can reduce a lot of time, because while e.g. still downloading data, you can be already processing those you have already received.

Python (as usually) comes with everything included. The abstraction of Queues is great for this setting, so you don't have to bother with low level primitives such as semaphores and locks.

Here is my solution, which should be pretty selfexplanatory:

import multiprocessing as mp
import time

def worker(queue, kafka_queue):
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        print('Processing %s (MP: %s) ' % (msg, mp.current_process().name))
        time.sleep(0.01)
        kafka_queue.put(msg)
        queue.task_done()

def send_kafka(queue):
    while True:
        msg = queue.get()
        print('Kafka sending %s' % msg)
        queue.task_done()


def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        print('Writing %s (MP: %s)' % (ii, mp.current_process().name))
        queue.put(ii)             # Write 'count' numbers into the queue
        time.sleep(0.001)

if __name__=='__main__':
    count = 30
    
    # initialize queues
    queue = mp.JoinableQueue()   # this is where we are going to store input data
    kafka_queue = mp.JoinableQueue()  # this where we are gonna push them out
    
    # create 4 processes, which takes queues as arguments
    # the data will get in and out through the queues
    # daemonize it
    processes = []
    for i in range(4):
        worker_process = mp.Process(target=worker, args=(queue, kafka_queue), daemon=True, name='worker_process_{}'.format(i))
        worker_process.start()        # Launch reader() as a separate python process
        processes.append(worker_process)

    print([x.name for x in processes])
    
    # this process simulates sending processed data out 
    kafka_process = mp.Process(target=send_kafka, args=(kafka_queue,), daemon=True, name='kafka_process')
    kafka_process.start()


    _start = time.time()
    writer(count, queue)    # Send a lot of stuff for workers tasks
    # wait till everything is processed
    queue.join()
    kafka_queue.join()
    print( "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)))
```python

and the output is:

['worker_process_0', 'worker_process_1', 'worker_process_2', 'worker_process_3'] Writing 0 (MP: MainProcess) Processing 0 (MP: worker_process_0) Writing 1 (MP: MainProcess) Processing 1 (MP: worker_process_1) Writing 2 (MP: MainProcess) Processing 2 (MP: worker_process_2) Writing 3 (MP: MainProcess) Processing 3 (MP: worker_process_3) Writing 4 (MP: MainProcess) Writing 5 (MP: MainProcess) Writing 6 (MP: MainProcess) Writing 7 (MP: MainProcess) Writing 8 (MP: MainProcess) Writing 9 (MP: MainProcess) Writing 10 (MP: MainProcess) Processing 4 (MP: worker_process_0) Kafka sending 0 Writing 11 (MP: MainProcess) Kafka sending 1 Processing 5 (MP: worker_process_1) Processing 6 (MP: worker_process_2) Kafka sending 2 Writing 12 (MP: MainProcess) Writing 13 (MP: MainProcess) Writing 14 (MP: MainProcess) Kafka sending 3 Processing 7 (MP: worker_process_3) Writing 15 (MP: MainProcess) Writing 16 (MP: MainProcess) Writing 17 (MP: MainProcess) Writing 18 (MP: MainProcess) Processing 8 (MP: worker_process_0) Kafka sending 4 Writing 19 (MP: MainProcess) Processing 9 (MP: worker_process_1) Kafka sending 5 Kafka sending 6 Processing 10 (MP: worker_process_2) Writing 20 (MP: MainProcess) Writing 21 (MP: MainProcess) Writing 22 (MP: MainProcess) Processing 11 (MP: worker_process_3) Writing 23 (MP: MainProcess) Kafka sending 7 Writing 24 (MP: MainProcess) Writing 25 (MP: MainProcess) Writing 26 (MP: MainProcess) Writing 27 (MP: MainProcess) Kafka sending 8 Processing 12 (MP: worker_process_0) Writing 28 (MP: MainProcess) Processing 13 (MP: worker_process_1) Kafka sending 9 Processing 14 (MP: worker_process_2) Kafka sending 10 Writing 29 (MP: MainProcess) Processing 15 (MP: worker_process_3) Kafka sending 11 Processing 16 (MP: worker_process_0) Kafka sending 12 Kafka sending 13 Processing 17 (MP: worker_process_1) Processing 18 (MP: worker_process_2) Kafka sending 14 Kafka sending 15 Processing 19 (MP: worker_process_3) Kafka sending 16 Processing 20 (MP: worker_process_0) Processing 21 (MP: worker_process_1) Kafka sending 17 Processing 22 (MP: worker_process_2) Kafka sending 18 Kafka sending 19 Processing 23 (MP: worker_process_3) Kafka sending 20 Processing 24 (MP: worker_process_0) Processing 25 (MP: worker_process_1) Kafka sending 21 Kafka sending 22 Processing 26 (MP: worker_process_2) Kafka sending 23 Processing 27 (MP: worker_process_3) Processing 28 (MP: worker_process_0) Kafka sending 24 Kafka sending 25 Processing 29 (MP: worker_process_1) Kafka sending 26 Kafka sending 27 Kafka sending 28 Kafka sending 29 Sending 30 numbers to Queue() took 0.0899205207824707 seconds