Introducing async-graph-data-flow

Gbolahan Okerayi
Senior Software Engineer, Civis Analytics
illustration of a campaign bottle being poured on a tower of glasses and a node-based data flow chart

Imagine a champagne tower. You pour champagne into the top glass until it fills up, then the champagne trickles down and fills the glasses below. The process doesn’t stop until there’s no more champagne added to the top glass.

Why are we talking about champagne and glasses? The way in which champagne flows down a champagne tower is very much like how we need data to flow through various processes in Civis Analytics’ data pipelines.

To this end, Civis is excited to introduce async-graph-data-flow, our new open-source Python library for executing asynchronous functions that pass data along a directed acyclic graph.

Python, n.

Python is a popular programming and scripting language for general use in constructing and streamlining computer code, including applications that incorporate significant amounts of data.

Explore the library on GitHub

Why Build async-graph-data-flow?

At a high level, Civis needs a way to run functions concurrently to import, transform, and export data in ETL pipelines. Why do we want concurrency? What’s wrong with chaining together functions with blocking code like f3(f2(f1(…)))? For our purposes, data pipelines involve a lot of I/O-bound processes (e.g., API calls, file operations), and for the sake of efficiency (and graceful failures), we need the non-blocking behavior concurrency provides.

ETL, v.

ETL stands for “extract, transform, and load,” and is standard practice in data integration. Data is extracted from a source, transformed into an archivable format, and then uploaded to a database.

Despite how counterintuitive it may seem at first blush, we want a lightweight tool that runs on a single machine. While we need to handle a large amount of data each day through Civis Platform’s distributed computing infrastructure, we design our ETL data pipelines to be modular and orchestrated by Civis Platform Workflows. Typically, each component in a pipeline is a job that runs in a containerized environment on an AWS EC2 instance; it is within this single-machine environment that we need to run functions concurrently to handle data. Given our context, the big players such as Airflow, Dask, etc. are not a good fit for us.

Previously, we used the third-party package Bonobo as the data pipeline executor engine. Bonobo is an ETL framework for Python that provides tools for building data transformation pipelines using plain Python primitives, and executing them in parallel. However, Bonobo has created a few issues for Civis’s internal tech stack:

  • We wanted a means to make our pipelines (which are primarily I/O-bound jobs) non-blocking, so that each task may operate independently based on the availability of data in its queues, rather than depending on the task preceding it.
  • The synchronous nature of Bonobo’s queue implementation was a drawback. We wanted a more robust queuing system intended for asynchronous programming to develop a producer and consumer architecture that can share data and allow for enhanced queue management.
  • Sometimes, our pipelines needed a way to run on several process workers for each node/task — something Bonobo was unable to provide.
  • Bonobo is no longer maintained. Its latest release supports only Python 3.9 or below.

When looking for a replacement for Bonobo, we realized it was entirely possible to write our own framework — one that is generalized enough to share with the rest of the world.

Enter async-graph-data-flow, leveraging asynchronous programming to run your functions concurrently. In Python, asynchronous programming has received official support since Python 3.4 under the asyncio module, as well as the new async/await syntax. For efficiency, it is perfect for handling I/O-heavy workloads, but doesn’t have the overhead and debugging issues that arise with threading: async-graph-data-flow is built on top of asyncio in the Python standard library, and doesn’t depend on any third-party libraries.

How async-graph-data-flow Works

OpenSource-blogimages-2
  • Asynchronous queues connect the functions: In response to the problems caused by the synchronous nature of Bonobo’s queue implementation, we are using asyncio.queue instances to improve queue size management and data transmission between the nodes. This eliminates the need to wait for the completed execution of an incoming node, and allows us to begin executing all the nodes as soon as data becomes available in the queue.
  • Multiple workers for a specific task: When adding a node to the graph, we have the option to divide long-running or heavy tasks into several asynchronous tasks. The program runs substantially faster as a result.
  • Mixing synchronous and asynchronous code: By accessing the running event loop, we can successfully call non-async code from async code. With this functionality, we can efficiently create our ETL pipelines to interface with Civis Platform using the Civis Python API client, and also take advantage of other non-async-based libraries.
  • Data flow statistics and logging: As the execution unfolds along the graph of async functions, we needed a way to track how much data has flown through a given node, as well as a method to log such information. Async-graph-data-flow provides utilities for these statistics and logging them at a regular interval.

What’s Next


Using async-graph-data-flow to rethink how we build our ETL pipelines:

  • Significantly improves the efficiency of our code
  • Reduces the amount of time required to complete massive tasks
  • Speeds up our development cycle when creating a new pipeline

This blog post has only scratched the surface of what is possible with this package. For more information on how to get started with async-graph-data-flow, its other features, and examples of usage, please see the official documentation.

For instance, here is sample code that uses the async-graph-data-flow package to pull down data about U.S. breweries from Open Brewery DB (at the time of this writing in early 2023, there are around 8,000 of these breweries):


(It’s only fitting that we began this blog post with champagne and ended it with beer, right?)

While async-graph-data-flow is designed for more experienced Python developers who would like to build their own downstream data pipelines and frameworks, Civis Platform users are also going to enjoy the benefits of this new open-source library as we roll it out across Platform’s data integration ecosystem.

Embrace data to elevate your decision-making.

Let’s put your data to work.