If you need to bring a lot of data into your app, a data pipeline can help. This article describes how to build a simple data pipeline with Ruby on Rails, Sidekiq, Redis, and Postgres.
What’s a data pipeline?
A data pipeline takes information from one place and puts it somewhere else. It likely transforms that data along the way. In enterprise circles, this is called ETL, or extract-transform-load.
Generally, data pipelines:
- Ingest a lot of data at once
- Ingest new or updated data on a schedule
Splitting data into small jobs
A reliable pipeline starts big and reduces data into small slices that can be independently processed. Bigger jobs “fan out” into small ones. The smallest jobs, handling one record each, do most of the work.
Breaking work into smaller jobs helps handle failure. If the pipeline ingests 1,000,000 records, chances are that a few of them are invalid. If those invalid records are processing in their own jobs, then then they can fail without impacting other records.
Lobby Focus, a project of mine, ingests United States lobbying data from the U.S. Senate. There are about 1,000,000 lobbying records from 2010-2020. Processing them all in a single job would be risky, as the entire job would fail if even one record triggered an error. Instead, Lobby Focus breaks down data into small slices and ends up processing each record in its own job. To bring in 1,000,000 lobbying records, it takes 1,001,041 jobs.
Pipelines can handle a ton of jobs with ease. Splitting up work into many small jobs offers advantages:
- Most jobs deal with only one record. If a job fails, it’s easy to pin down the problematic record, fix it, and retry it. The rest of the pipeline continues uninterrupted.
- The pipeline can ingest data in parallel, with many small jobs running at once.
- It’s easy to track progress by looking at how many jobs succeeded and how many remain.
A Ruby on Rails pipeline stack
This stack, used in the example below, makes for a solid data pipeline:
- Ruby on Rails interfaces with all the pieces below and has a strong ecosystem of libraries.
- Sidekiq manages data pipeline jobs, providing visibility into progress and errors.
- Redis backs Sidekiq and stores the job queue.
- Postgres stores intermediate and final data.
- Amazon S3 serves as a staging area for source data.
Managed platforms like Heroku or Cloud 66 are good hosting options for this stack.
Real-world example: U.S. Senate lobbying data
Lobby Focus ingests lobbying data from the U.S. Senate and makes it accessible to users. Here’s a peek into its data pipeline, which breaks data into small jobs and uses the stack above.
Step 1: Scrape
1 Sidekiq job deals with 1,000,000 records
- Gathers a list of ZIP files from the U.S. Senate, using HTTParty to retrieve the webpage and Nokogiri to parse it
- Launches a download job for each ZIP file. To ingest lobbying data from 2010 to 2020, it will launch 40 download jobs for 40 ZIP’s.
Step 2: Download
40 Sidekiq jobs deal with 25,000 records each
Parameter: ZIP file URL
- Checks the last-modified header to see if the current version was already processed. If so, aborts the job.
- Downloads the ZIP and unzips its contents into Amazon S3, using rubyzip and aws-s3-sdk.
- Launches a process job for each XML file in the ZIP.
Step 3: Process
1,000 Sidekiq jobs deal with 1,000 records each
Parameter: XML file URL (S3)
- Parses the XML with Nokogiri
- Adds the content of each record in the XML file to Postgres, as a raw XML string.
- Launches a load job for each record in the XML file, passing the Postgres record ID.
Postgres can bear a high volume of record inserts. The pipeline uses it as an intermediate data store. The next job, load, receives the Postgres record ID and accesses it to get the XML string. Using Postgres to pass the XML avoids passing giant parameters between jobs. Sidekiq recommends small job parameters to keep the queue lean and preserve Redis memory.
Step 4: Load
1,000,000 Sidekiq jobs deal with 1 record each
Parameter: Postgres record ID
- Retrieves the XML string from Postgres and parses it with Nokogiri
- Validates data and aborts the job with an error if there is an issue
- Upserts final data record into Postgres
Breaking out data loads into small jobs is key to a resilient pipeline. Here are some more tips:
Make jobs idempotent
Any job should be able to run twice without duplicating data. In other words, jobs should be idempotent.
Look for a unique key in the source data and use it to avoid writing duplicate data.
For instance, you can set a Postgres unique index on the key. Then, use Rails’ create_or_find_by method to create or update a record, depending on whether the key already exists.
Use constraints to reject invalid data
Set constraints to stop invalid data from coming in. If a field should always have a value, use a Postgres constraint to require that field. Use Ruby’s strict conversion methods to throw an error and abort a job if you’re expecting one data type (say, an integer like 3) but get another (say, a string like “n/a”).
Data is never perfect. If you catch invalid data errors early, you can adjust your pipeline to handle unexpected values.
What to read next
A job management library like Sidekiq is critical to a data pipeline. Read more about Sidekiq, the backbone of this example. Its best practices docs are small and useful. If you’re not using Ruby, find the top job management libraries for your language.
To practice creating a pipeline, find a public dataset that interests you and build a pipeline for it. The United States Government, for instance, lists its open government data on data.gov. Datasets range from physician comparison data and a database of every dam in the U.S.
Finally, check out managed solutions like Google Dataflow for particularly large pipelines.
Have questions or want advice? Send me a tweet @coreybeep
Photo by Victor Garcia on Unsplash
5 replies on “Data pipelines in Ruby on Rails”
How feasible is RoR for ETL type of tasks ? I mean ETL have different set of challenges like:
Extraction : supporting multiple file formats, huge file size, data cleansing, easily switching data source connector (Eg: SFTP for development but AWS S3 for production)
Transformation : Conversion to JSON/XML, serialization
Loading : Sending transformed data to multiple sources like S3 bucket, Kinesis stream, database.
I recently worked on Apache Camel (Java) for my ETL project which is an integration framewok built on concepts of EIP but couldn’t find similar framework in Ruby.
In theory, each step you’re describing is possible to do in Ruby / RoR, using the methods described in this article. If you go that route, I’d break the pipeline down into small jobs as much as possible. For instance, if you’re writing transformed data to three sources, there could be a separate job for each source that is responsible for writing a record or set of records to it. You’d want to use small jobs and file streaming to minimize memory use (you mentioned huge file sizes).
Also check out Kiba, an ETL for Ruby.
There are also managed ETL solutions from Google Cloud (like Dataflow) and AWS that run data pipelines on their SDK’s, but they may not support Ruby.
This seems like it would be very slow, how long does it take to load the all the data from your example?
Most of the pipeline can be horizontally scaled. For instance, hundreds of jobs could be parsing XML files at once. You can tweak Sidekiq’s concurrency settings and add worker instances to speed up the pipeline.
The bottlenecks in the pipeline are the pieces that can only be vertically scaled – Redis and Postgres. Increasing I/O capacity, CPU, and/or memory will speed them up.
Lobby Focus processes new lobbying records each day, and there are only a few hundred of those at most. So, the daily runs are fast. The full historical load – 1,000,000 records – takes about 6 hours, but I’ve not horizontally + vertically scaled to speed that up. You could probably cut that in half with powerful Postgres and Redis instances + more Sidekiq worker instances.
Managed ETL solutions like Google Dataflow (backed by Apache Beam) let you throw a ton of power at a pipeline and are optimized for huge datasets. If you needed to process millions or billions of records a day quickly, then I’d take a look at those.