Data Pipelines and Versioning with the Pachyderm Go Client
I know about Gophers, but what is a Pachyderm?
Pachyderm is an open source framework, written in Go, for reproducible data processing. With Pachyderm, you can create language agnostic data pipelines where the data input and output of each stage of your pipeline are versioned controlled in Pachyderm’s File System (PFS). Think “git for data.” You can view diffs of your data and collaborate with teammates using Pachyderm commits and branches. Also, if your data pipeline generates an unexpected result, you can debug or validate it by understanding the historical processing steps leading to the result (or even reproducing them exactly).
Pachyderm leverages the container ecosystem (Kubernetes and Docker) to enable this functionality and to distribute your data processing. It can parallelize your computation by only showing a subset of your data to each container within a Pachyderm cluster. A single node either sees a slice of each file (a map job) or a whole single file (a reduce job). The data itself can live in an object store of your choice (usually S3 or GCS), and Pachyderm smartly assigns different pieces of data to be processed by different containers.
As mentioned, you can build your Pachyderm data pipelines using any languages or frameworks (Go, Python, Tensorflow, Spark, Rust, etc.), but Pachyderm does have a nice Go client that will let you launch pipelines, put data into data versioning, pull data out of data versioning, etc. directly from your Go applications. For example, you could commit metrics from your Go backend service directly to Pachyderm and, on every commit, have Pachyderm automatically update predictive analysis (written in Go, Tensorflow, python, or whatever you might prefer) to detect fraudulent activity based on those metrics.
To read more about the Pachyderm project, visit Pachyderm’s website and look through the docs.
A data processing example for this post
In this post, we are going to illustrate some distributed data processing and data versioning with a few simple Go programs and some Pachyderm configuration. If this piques your interest, stay tuned to GoingGo.net for a more detailed series of posts about the Pachyderm Go client, contact @dwhitena on Gophers Slack with questions, and/or participate in an Ultimate Data Science course.
The goal of the data processing here will be to generate statistics about Go projects posted to GitHub. We will:
Deploy Pachyderm.
Create a Pachyderm pipeline that takes Go repository names (e.g.,
github.com/docker/docker
) as input and outputs a couple of stats/metrics about those repositories.Write a Go program that commits a series of repository names one at a time into Pachyderm’s data versioning. For each commit, we will automatically trigger the pipeline created in step 2 to update our stats.
To keep things simple, we will just calculate two statistics/metrics for each repository, number of lines of Go code and number of dependencies. Our input data (supplied by the Go program written in step 3) will look something like this:
1
|
myusername/projectname |
where the github.com/
prefix is implied, and our output data will look like this:
1
|
myusername/projectname, 4, 350 |
where 4 is the number of dependencies in github.com/myusername/projectname
and 350 is the number of lines of Go code. As we commit more and more input data, we will update our statistics. For example, if we commit additional input data:
1
|
myusername/anotherprojectname |
we will have Pachyderm automatically update our results:
1 2 |
myusername/projectname, 4, 350 myusername/anotherprojectname, 8, 427 |
The output of the pipeline will be version controlled by Pachyderm as well. This is pretty cool, because, if/when we wanted, we could look back in time to what our stats were at any commit. Further we could deduce the provenance of the results (i.e., what data and calculations led to those results) and reproduce them exactly. To learn more about this “data provenance” feature, check out this article.
Step 1: Deploying Pachyderm
Our Pachyderm pipeline will run in, where else, but a Pachyderm cluster. Thus, let’s get a Pachyderm cluster up and running. Thankfully, Pachyderm can be deployed in just a few commands locally, or via a deploy command for Google, Amazon, or Azure cloud platforms.
After completing one of these deploys, you can verify that your Pachyderm cluster is running with the Pachyderm CLI tool pachctl
:
1 2 3 4 |
$ pachctl version COMPONENT VERSION pachctl 1.3.0 pachd 1.3.0 |
where pachd
is the Pachyderm server daemon.
Step 2a: Creating a Pachyderm Pipeline
To create a pachyderm pipeline we need:
- One or more Docker images that will be used to process/manipulate input data (in this case to calculate number of lines of Go code and number of dependencies), and
- A JSON pipeline specification that tells Pachyderm which images to use, how to parallelize, what to use as input data, etc.
Regarding the first piece, let’s just run wc -l
to get the number of lines of go codes and go list
to get the number of dependencies for each Go project. We will put these commands in a shell script that can be run in a Docker image built FROM golang
.
Aside: Note, even though our “processing” is simple in this example, one of the beauties of Pachyderm is that you can use any Docker images for the data processing. This includes any language or framework and any logic from simple unix commands to recurrent neural networks implemented in Tensorflow.
Here is the shell script, stats.sh
that we will use:
|
|
This includes the wc -l
and go list
commands along with some clean up and things to support Godep. The script will output our metrics given a GitHub Go project name input to the variable REPONAME
. Note, the metrics are output to /pfs/out
, which is a way of telling Pachyderm to output something to the output data repository of a pipeline stage (which will be explained further below). Our Docker image is simply the golang
image plus this script:
1 2 |
FROM golang ADD stats.sh / |
We can then create a JSON Pachyderm pipeline specification, pipeline.json
, that uses this image (uploaded to Docker Hub as dwhitena/stats
) to process our input data:
|
|
In this specification, we are telling Pachyderm the following:
- Use the
dwhitena/stats
image. - Run the command
/bin/bash
in the container with the specifiedstdin
. - We are not parallelizing this processing yet (although we could specify a specific parallelization by simply changing the
constant
field underparallelism_spec
). - The data input for this pipeline is a “repo” named
projects
. Remember Pachyderm’s data versioning is similar to “git for data.” In this case, we are telling the pipeline to look for input in the versioned data repository calledprojects
. We could specify multiple repositories if we wish. When the container runs, it will have access to the specified repos at/pfs/<reponame>
(/pfs/projects
here). - We are accessing the input via a
reduce
method. This means that as data is committed, the pipeline will only see the new data, and, if we introduced parallelism, Pachyderm could distribute the data over the containers at a block level. For more information on this method and other see the docs for Combining Parition Unit and Incrementality.
In essense, when new data is committed to a data repository call projects
, this pipeline will be triggered and process the data using the specified image, cmd, and stdin. The pipeline only has one “stage.” That is, we are only specifying one data processing step that takes data from the projects
repository and outputs our metrics. If we wanted to have more stages in our pipeline (e.g., a stage that compiles the project and measures compile time), we would just need to add more JSON blobs to pipeline.json
. See Pachyderm’s fruit stand example to see a multistage pipeline in action.
Step 2b: Run and Test the Pachyderm Pipeline
At this point, we have the following:
- A script called
stats.sh
that calculates metrics for a Go project given a GitHub repository name. - A docker image
dwhitena/stats
containing the script. - A pipeline spec called
pipeline.json
that runs the script on data committed to aprojects
data repository.
Before we run this pipeline using the Go client, let’s run it manually to ensure that it works and to gain some intuition about Pachyderm’s pipelining and data versioning. To run the pipeline manually, we first need to create the projects
data repository with the pachctl
CLI tool:
1
|
$ pachctl create-repo projects |
Next, let’s create the pipeline with our JSON specification:
1
|
$ pachctl create-pipeline -f pipeline.json |
At this point, we haven’t committed any data into Pachyderm’s data versioning, and, thus, our pipeline doesn’t have any input data to process. However, we can verify that our pipeline and repository exist:
1 2 3 4 5 6 7 |
$ pachctl list-repo NAME CREATED SIZE projects 18 seconds ago 0 B stats 5 seconds ago 0 B $ pachctl list-pipeline NAME INPUT OUTPUT STATE stats projects stats running |
Notice that an output repository (with the name of our pipeline) has also been created. The output of our stats
pipeline will be versioned there.
Now, let’s commit some data into the input data repository called projects
. Specifically we will commit a first file one.txt
into the projects
repository on the master
branch, where one.txt
includes the GitHub repository name that we want to analyze (docker/docker
):
1
|
echo "docker/docker" | pachctl put-file projects master one.txt -c |
This will trigger the first run of our stats
pipeline, and we can confirm that the pipeline ran via:
1 2 3 |
$ pachctl list-job ID OUTPUT STARTED DURATION STATE 4c4f53668e46c20fdeb1286ca971ea1f stats/9c8c1ad2667d44d586818306ec19f1ec/0 About a minute ago 57 seconds success |
The OUTPUT
column above shows the location of the output of our stats
pipeline within Pachyderm’s data versioning in the pattern <repo name>/<branch>/<commit>
. To see what our output looks like we can list the files in the output and get the results
file:
1 2 3 4 5 |
$ pachctl list-file stats 9c8c1ad2667d44d586818306ec19f1ec NAME TYPE MODIFIED LAST_COMMIT_MODIFIED SIZE /results file About an hour ago 9c8c1ad2667d44d586818306ec19f1ec/0 27 B $ pachctl get-file stats 9c8c1ad2667d44d586818306ec19f1ec /results docker/docker, 559, 798271 |
The results
file shows that our pipeline determined that the github.com/docker/docker
project has 559 dependencies and 798,271 lines of Go code. Pretty cool. Let’s commit another repository and see what happens:
1
|
echo "kubernetes/kubernetes" | pachctl put-file projects master two.txt -c |
We can now see that two commits have been made to the projects
data repository:
1 2 3 4 |
$ pachctl list-commit projects BRANCH REPO/ID PARENT STARTED FINISHED SIZE master projects/master/0 <none> About an hour ago About an hour ago 14 B master projects/master/1 master/0 50 seconds ago 49 seconds ago 22 B |
where the second commit added the second file (and thus has a parent commit with the first file). When we added this second commit, Pachyderm automatically ran our stats
pipeline based on the newly committed data. We can confirm this with list-job
again, and we can also see that there are two commits in our output stats
repo:
1 2 3 4 5 6 7 8 |
$ pachctl list-commit stats BRANCH REPO/ID PARENT STARTED FINISHED SIZE 9c8c1ad2667d44d586818306ec19f1ec stats/9c8c1ad2667d44d586818306ec19f1ec/0 <none> About an hour ago About an hour ago 27 B e1a8a83729d64ebeb9d8549ae9581e3f stats/e1a8a83729d64ebeb9d8549ae9581e3f/0 <none> 3 minutes ago 0 B $ pachctl list-commit stats BRANCH REPO/ID PARENT STARTED FINISHED SIZE 9c8c1ad2667d44d586818306ec19f1ec stats/9c8c1ad2667d44d586818306ec19f1ec/0 <none> 2 hours ago 2 hours ago 27 B e1a8a83729d64ebeb9d8549ae9581e3f stats/e1a8a83729d64ebeb9d8549ae9581e3f/0 <none> 10 minutes ago 3 minutes ago 64 B |
Now if we get the results
file, we will see that the new commit was processed:
1 2 3 |
dwhitena@dirac:pachyderm-go-stats$ pachctl get-file stats e1a8a83729d64ebeb9d8549ae9581e3f /results docker/docker, 559, 798232 kubernetes/kubernetes, 1459, 2800450 |
Sweet! We can keep committing new files and the results will keep getting updated. Not only that, because everything is versioned, we can still access the state the results at any point in history. For example, we can still access the results
file at the previous commit:
1 2 |
$ pachctl get-file stats 9c8c1ad2667d44d586818306ec19f1ec /results docker/docker, 559, 798271 |
Step 3a: Write a Go program that uses the Pachyderm client
The pachctl
CLI is useful, but now let’s use Pachyderm’s Go client to stream a series of Go project names into the projects
repo and, in turn, calculate stats for each of the committed projects.
First, create a program feed.go
that imports Pachyderm’s Go client, connects to our Pachyderm cluster, and creates the projects
repo:
|
|
Then add a loop that commits files to projects
, where each successive file includes the name of a different Go project on GitHub:
|
|
Finally, let’s add the functionality to create the stats
pipeline:
|
|
The entire feed.go
program with all of these pieces can be found here.
Step 3b: Runnning our Go program and examining the results
The above feed.go
program will do everything we did in steps 2a and 2b, but directly from Go itself. Moreover, it allows us to automate the commits of input data.
After deleting our previous testing data and pipeline with pachctl delete-all
, we can compile and run this program. We will then observe a few things. First, all of our commits to the input repo projects
show up:
1 2 3 4 5 6 7 8 9 |
$ pachctl list-commit projects BRANCH REPO/ID PARENT STARTED FINISHED SIZE master projects/master/0 <none> 6 minutes ago 6 minutes ago 13 B master projects/master/1 master/0 6 minutes ago 6 minutes ago 21 B master projects/master/2 master/1 6 minutes ago 6 minutes ago 16 B master projects/master/3 master/2 6 minutes ago 6 minutes ago 10 B master projects/master/4 master/3 6 minutes ago 6 minutes ago 21 B master projects/master/5 master/4 6 minutes ago 6 minutes ago 19 B master projects/master/6 master/5 6 minutes ago 6 minutes ago 11 B |
Also, we will see that Pachyderm is running our pipeline for each of the commits of data into projects
:
1 2 3 4 5 6 7 8 9 |
$ pachctl list-job ID OUTPUT STARTED DURATION STATE d9255dbe5df83e25ba3d3c890e022598 stats/cd309fd23a3745b68dafe56b581d63bf/0 2 minutes ago - running d15c3e962e2ed2412be460db26a21c85 stats/46b8270e1e2c4fdd990d89eaa4b4351b/0 2 minutes ago - running 7d7a88b806fefcf8f2166492eb7e32a6 stats/65e0d4edaca7483fa06c7c2443133774/0 2 minutes ago - running 35f6c95d843a12f506e3d18c94dfffab stats/ac98664623a74f029506ec10701387c1/0 2 minutes ago - running fd5b8623ca2556e0da4b71fc205c6f52 stats/f7ebb735f2fa46b894d3ce81e8cb8855/0 2 minutes ago - running 16f1108c71e6a0da6dca4712e54d4ddb stats/ff7a262a03ea4f279e92c6d54c7e8129/0 2 minutes ago - running 4c4f53668e46c20fdeb1286ca971ea1f stats/930e29b046a948649176b04225a547d9/0 2 minutes ago - running |
Eventually these will finish, and we will see each corresponding commit to our output data repository stats
:
1 2 3 4 5 6 7 8 9 |
$ pachctl list-commit stats BRANCH REPO/ID PARENT STARTED FINISHED SIZE 46b8270e1e2c4fdd990d89eaa4b4351b stats/46b8270e1e2c4fdd990d89eaa4b4351b/0 <none> 27 minutes ago 59 seconds ago 183 B 65e0d4edaca7483fa06c7c2443133774 stats/65e0d4edaca7483fa06c7c2443133774/0 <none> 27 minutes ago 3 minutes ago 151 B 930e29b046a948649176b04225a547d9 stats/930e29b046a948649176b04225a547d9/0 <none> 27 minutes ago 23 minutes ago 27 B ac98664623a74f029506ec10701387c1 stats/ac98664623a74f029506ec10701387c1/0 <none> 27 minutes ago 3 minutes ago 116 B cd309fd23a3745b68dafe56b581d63bf stats/cd309fd23a3745b68dafe56b581d63bf/0 <none> 27 minutes ago 22 seconds ago 208 B f7ebb735f2fa46b894d3ce81e8cb8855 stats/f7ebb735f2fa46b894d3ce81e8cb8855/0 <none> 27 minutes ago 3 minutes ago 94 B ff7a262a03ea4f279e92c6d54c7e8129 stats/ff7a262a03ea4f279e92c6d54c7e8129/0 <none> 27 minutes ago 5 minutes ago 64 B |
Finally, we can examine our output file, results
, to see all of the nice metrics for the Go projects we supplied:
1 2 3 4 5 6 7 8 |
$ pachctl get-file stats cd309fd23a3745b68dafe56b581d63bf /results docker/docker, 559, 798232 kubernetes/kubernetes, 1459, 2801166 hashicorp/consul, 127, 357396 spf13/hugo, 15, 39430 prometheus/prometheus, 298, 889665 influxdata/influxdb, 68, 126163 coreos/etcd, 157, 407630 |
where, as a reminder, the first number in each row is the number of dependencies in the project and the second number is the number of lines of Go code in the project (at least at the time of writing this post). Yay for data versioning, pipelining, and analysis in Go! Be sure to try this out on your own and replace the projects above with the ones that are interesting to you.
Resources
All the code and files mentioned in this post can be found here.
Get started with Pachyderm now by installing it in just a few commands. Also be sure to:
- Join the Pachyderm Slack community for questions, discussions, deployment help, etc.
- Read the Pachyderm docs.
- Read the godocs for the Pachyderm Go client.
- Check out example Pachyderm pipelines.
- Connect with Pachyderm on Twitter.
Also, more generally, make sure to get involved in the Go data science community:
- Join the
#data-science
channel on Gophers Slack. - Check out the gopherds org on GitHub. Specifically, visit the resources repo for a list of Go data science packages and community updates.
- Join the GopherDS mailing list.
- Attend community events and workshops, such as data-related talks at Gopher conferences and meetups or public Ardan Labs Ultimate Data Science courses.