We needed fast transfer of data from our key-value database. As a protocol, classical HTTP has been proposed, while the response would be JSON lines.
First of all, e.g. kafka or protocol buffers would be a better choice. Unfortunately, for various reasons, none of these were possible in our cases.
The Python program was responsible for:
- fetching data from DB through HTTP request
- deserializing from JSON
- process the data somehow
- push them to
I decided to use multiprocessing for this, resulting in the following architecture:
- initialize N workers, who are going to consume from the input queue and send data to
- master process will execute the download and push data to the input queue (from where workers are going to consume)
Machines were on AWS EC2, so we wanted to leverage the fact that IO could be quite fast. Nevertheless, the streaming took unprecedented amount of time however I tried to optimize.
def stream_requests(url, chunk_size=10000): import requests response = requests.get( url, stream=True ) for chunk in response.iter_lines(chunk_size=chunk_size, delimiter=b'\n'): if chunk: yield json.loads(chunk)
is terribly slow - it got us only to something like 3MB/s on AWS, which is compared to 200 MB/s you get in curl quite embarrassing. Different chunksize didn't change much, it was even worse...
So I tried something like this, but the result was the same...
Hence, I capitulated to the following totally obscure and non-elegant solution, which can do it only few times slower then the raw CLI variant:
import json import subprocess import logging # standard logging logger = logging.getLogger(__name__) def stream_curl(url): process = subprocess.Popen(['/usr/bin/curl', '-s', url], stdout=subprocess.PIPE) ix = 0 empty_lines = 0 while True: line = process.stdout.readline() # process ended if process.poll() is not None: logger.info('Downloading completed after %s lines.', ix) break ix += 1 if not line: logger.warning('Empty line: `%s`!', ix) if empty_lines > 10: raise IOError('To many empty lines when downloading data!') empty_lines += 1 else: yield json.loads(line)
and then use it as:
my_stream = stream_curl(url) for json_object in my_stream(): do_whatever_you_want(json_object)
I almost cried, but that's how the world is. Get over that.