Skip to Main Content
Gbolahan Okerayi
Gbolahan Okerayi | Senior Software Engineer, Civis Analytics

illustration of a campaign bottle being poured on a tower of glasses and a node-based data flow chart

Imagine a champagne tower. You pour champagne into the top glass until it fills up, then the champagne trickles down and fills the glasses below. The process doesn’t stop until there’s no more champagne added to the top glass.

Why are we talking about champagne and glasses? The way in which champagne flows down a champagne tower is very much like how we need data to flow through various processes in Civis Analytics’ data pipelines. 

To this end, Civis is excited to introduce async-graph-data-flow, our new open-source Python library for executing asynchronous functions that pass data along a directed acyclic graph.

Python, n.

Python is a popular programming and scripting language for general use in constructing and streamlining computer code, including applications that incorporate significant amounts of data.

Why Build async-graph-data-flow?

At a high level, Civis needs a way to run functions concurrently to import, transform, and export data in ETL pipelines. Why do we want concurrency? What’s wrong with chaining together functions with blocking code like f3(f2(f1(…)))? For our purposes, data pipelines involve a lot of I/O-bound processes (e.g., API calls, file operations), and for the sake of efficiency (and graceful failures), we need the non-blocking behavior concurrency provides.

ETL, v.

ETL stands for “extract, transform, and load,” and is standard practice in data integration. Data is extracted from a source, transformed into an archivable format, and then uploaded to a database.

Despite how counterintuitive it may seem at first blush, we want a lightweight tool that runs on a single machine. While we need to handle a large amount of data each day through Civis Platform’s distributed computing infrastructure, we design our ETL data pipelines to be modular and orchestrated by Civis Platform Workflows. Typically, each component in a pipeline is a job that runs in a containerized environment on an AWS EC2 instance; it is within this single-machine environment that we need to run functions concurrently to handle data. Given our context, the big players such as Airflow, Dask, etc. are not a good fit for us.

Previously, we used the third-party package Bonobo as the data pipeline executor engine. Bonobo is an ETL framework for Python that provides tools for building data transformation pipelines using plain Python primitives, and executing them in parallel. However, Bonobo has created a few issues for Civis’s internal tech stack:

When looking for a replacement for Bonobo, we realized it was entirely possible to write our own framework — one that is generalized enough to share with the rest of the world.

Enter async-graph-data-flow, leveraging asynchronous programming to run your functions concurrently. In Python, asynchronous programming has received official support since Python 3.4 under the asyncio module, as well as the new async/await syntax. For efficiency, it is perfect for handling I/O-heavy workloads, but doesn’t have the overhead and debugging issues that arise with threading: async-graph-data-flow is built on top of asyncio in the Python standard library, and doesn’t depend on any third-party libraries.

How async-graph-data-flow Works

OpenSource-blogimages-2
  • Asynchronous queues connect the functions: In response to the problems caused by the synchronous nature of Bonobo’s queue implementation, we are using asyncio.queue instances to improve queue size management and data transmission between the nodes. This eliminates the need to wait for the completed execution of an incoming node, and allows us to begin executing all the nodes as soon as data becomes available in the queue.
  • Multiple workers for a specific task: When adding a node to the graph, we have the option to divide long-running or heavy tasks into several asynchronous tasks. The program runs substantially faster as a result.
  • Mixing synchronous and asynchronous code: By accessing the running event loop, we can successfully call non-async code from async code. With this functionality, we can efficiently create our ETL pipelines to interface with Civis Platform using the Civis Python API client, and also take advantage of other non-async-based libraries.
  • Data flow statistics and logging: As the execution unfolds along the graph of async functions, we needed a way to track how much data has flown through a given node, as well as a method to log such information. Async-graph-data-flow provides utilities for these statistics and logging them at a regular interval.

What’s Next

Using async-graph-data-flow to rethink how we build our ETL pipelines: 

This blog post has only scratched the surface of what is possible with this package. For more information on how to get started with async-graph-data-flow, its other features, and examples of usage, please see the official documentation.

For instance, here is sample code that uses the async-graph-data-flow package to pull down data about U.S. breweries from Open Brewery DB (at the time of this writing in early 2023, there are around 8,000 of these breweries):

(It’s only fitting that we began this blog post with champagne and ended it with beer, right?)

While async-graph-data-flow is designed for more experienced Python developers who would like to build their own downstream data pipelines and frameworks, Civis Platform users are also going to enjoy the benefits of this new open-source library as we roll it out across Platform’s data integration ecosystem.

young person at a desk staring intently into a laptop screen

Interested in how you can make use of Civis Platform for your data needs?

Speak with our team of data engineers today!