Dataflow with Python

-

introduction

When you want to start doing some data ingestion on the Google Cloud Platform, Dataflow is a logical choice.

Java offers more possibilities (see built-in I/O Transform) but still there might be reasons why you need to stick to Python (for example, broader use and adoption in your organization).

setup

python version

The versions that are currently supported: 3.6, 3.7 or 3.8. If you are free to choose, pick up the latest version.

beam version

When you decided on the python version check the dependencies versions as well. All versions work with the latest Apache Beam version but some other dependencies might hold you back.

dependency management

A Dataflow job is like any other Python application, so you first need to settle on a way to manage the dependencies.
In this post, I will be using pipenv.

When pipenv is installed, you can start installing dependencies right away. pipenv will create a virtual environment and start populating it with the dependencies you install.

apache-beam is the first dependency you should install:

pipenv --python 3.8 install apache-beam

Depending on what you need to achieve, you can install extra dependencies (for example: bigquery or pubsub).

pipeline

worker setup

In order to have a correct setup on all worker, Dataflow is running a python script that can be specified as a pipeline option.

Create a new setup.py file with the following content updating it where needed:

from setuptools import setup, find_packages

VERSION_NUMBER = '0.0.1'

setup(
  name='df-python-example-job',
  version=VERSION_NUMBER,
  description="",
  long_description=open("README.md").read(),
  classifiers=[
    "Programming Language :: Python"
  ],
  keywords='',
  url='',
  packages=find_packages(),
  include_package_data=True,
  zip_safe=False,
  install_requires=[],
  author='Me',
  author_email='me@a-domain.nl'
)

You can get a dependency list via pipenv, paste the output of the following command in the install_requires array:

pipenv run pip freeze | sed 's/^/"/;s/$/",/;$ s/,$//g'

prepare pipeline

arguments

If you need to pass some information to the job depending on the context, you can do that by:

  • passing command line arguments
  • read a configuration file.

command line arguments

You can rely on argparse to do that.

parser = argparse.ArgumentParser()
parser.add_argument('--environment',
                    dest='environment',
                    choices=['local', 'cloud'],
                    required=True,
                    help='Environment the pipeline is running on: [local or cloud].')
known_args, pipeline_args = parser.parse_known_args(argv)

One important (and arguably unmissable) argument is the environment, it makes it possible to apply different settings for local or cloud execution.

You can also add all the other arguments your pipeline needs.

pipeline_args are all argument that are not specifically listed, they can be used to set pipeline options.

configuration file

Instead of passing lots of command line arguments, you can read a configuration file depending on the environment the pipeline is executed on.

You can first get the environment from the command line argument and then select the correct configuration before reading its content.

config = configparser.ConfigParser()
config.read('example-%s.ini' % known_args.environment)

For the cloud environment, you can generate the production configuration with ConfigMaps.

pipeline options

Dataflow has its own options, those option can be read from a configuration file or from the command line.

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'luminis-df-python-example'

runner and project are mandatory.

  • runner sets the data processing system the pipeline will run on
  • project sets the Google Cloud Project the pipeline will be bind to

When running in the cloud, a different runner needs to be selected. In our case that is the DataflowRunner.

if known_args.environment == 'cloud':
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'

You can refer to the list of options to set other aspects of the pipeline.

As mentioned above, the setup script location should be specified as an option as well.

pipeline_options.view_as(SetupOptions).setup_file = './setup.py'

pipeline steps

Next you can start setting up the steps of the pipeline.

pipeline = beam.Pipeline(options=pipeline_options)

When you write your pipeline, you describe a chain of transform operation. For this Apache Bean uses / and >> (their basic function is overwritten)

(pipeline | 'read lines' >> ReadFromText(csv_file)
          | 'map lines to object' >> beam.ParDo(MapCsvLineToObjectFn())
          | 'write objects to file as JSON' >> WriteToText(output, file_name_suffix='json', coder=JsonCoder()))

ReadFromText and WriteToText are built-in PTransforms. beam.ParDo() applies a function. MapCsvLineToObjectFn extends beam.DoFn.

To avoid any surprises, declare type hints when writing your DoFns and PTransfoms.

Your pipeline will run on a distributed environment so always pay attention to the following rules:

  • serializability
  • thread-compatibility
  • idempotence

metrics

When executing your pipeline, you can log metrics during execution this allows you to:

  • see how far the pipeline is
  • access historical data
  • trigger some follow-ups if the metrics diverge from what is expected

Instanciate a counter:

counter = Metrics.counter(self.__class__, 'map_csv_to_object_counter')

Increment it when needed:

counter.inc()

There are other metrics types available.

The metrics are visible when and after the job has run.

pipeline state

At any time a pipeline is in a given state. The state is linked to the type of runner not all possible states a available to all runner (sometimes a state doesn’t make sense for a runner).

You don’t want to be fetching the state of a pipeline after running it. You can do that as part of a different flow. The code bellow starts the pipeline and wait until if finishes (successfully or with an error).

result = pipeline.run()
result.wait_until_finish()

result.state contains the final state of the pipeline. Note that any exception won’t be caught and it is useless to check the state in that kind of situation, you need to throw and catch your own exceptions.

Having the state of the pipeline at the end makes it possible to execute post-pipeline steps.

test

There are different ways to test a pipeline, you can re-define you pipeline steps and run them with TestPipeline or you can pass static data to your script and check the output (end to end).

Continuing on the example above, you want to check that the conversion from CVS lines to JSON works correctly. You first define inputs and expected output:

CSV = [
   "index;message",
   "1;test1",
   "2;test2",
   "3;test3;incorrect",
]
EXPECTED_OBJECTS = [
  {
    "index": "1",
    "message": "test1"
  },
  {
    "index": "2",
    "message": "test2"
  },
]

Then you can create temporary file containing the items in the CSV array. You can use the tempfile module for that.

@staticmethod
def create_temp_file(contents: [] = None):
   c_dump_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv')
   if contents is not None:
     for line in contents:
       c_dump_file.write(line.encode('utf-8'))
       c_dump_file.write("\n".encode("utf-8"))
       c_dump_file.close()
   return c_dump_file.name

You can then start your pipeline:

output_file = create_temp_file()
csv_file = create_temp_file(self.CSV)

example.run(
   ['--environment=local',
    '--output=%s.result' % output_file,
    '--csv-file=%s' % csv_file])

Beam does output sharding, that means that the result will be split into multiple files with standard suffix format convention. So this means you need to merge the results.

results = []
with open_shards(output_file + '.result-*-of-*') as result_file:
   for line in result_file:
     results.append(json.loads(line))

You can then check if the generated output is the same as what is expected:

self.assertCountEqual(self.EXPECTED_OBJECTS, results, 'Result and expected object list are different')

run

execute locally

pipenv run python3 -m example --environment local --csv-file example.csv --output example

execute on Google Cloud

Executing on Google Cloud means executing using the Dataflow runner. In our example, reading local file doesn’t make sense. So we need to read/write from/to a Google Bucket.

ReadFromText and WriteToText are able to transparently handle that so no code change is needed. Note that the csv-file and output parameters require the scheme for GCS.

  1. create input and ouput bucket: luminis-df-python-example-in and luminis-df-python-example-out
  2. upload the CSV file to the input bucket.

pipenv run python3 -m example --environment cloud \
--csv-file gs://luminis-df-python-example-in/example.csv \
--output gs://luminis-df-python-example-out/example

deploy

In most cases, you want your pipeline to be executed multiple time (scheduled).
The most straight forward way to deploy your pipeline is to:

  • build a Docker image based on the code above
  • upload it to your GCP registry
  • execute the command above in the way that fits you own organization
    • build an image that ships cron as the main process and add the command as a cron entry
    • use Kubernetes CronJob

image with cron

File containing cron expression, example-cron:

0 1 * * * . /env.sh; cd / && pipenv run python3 -m example --environment cloud \
--csv-file gs://luminis-df-python-example-in/example.csv \
--output gs://luminis-df-python-example-out/example > /proc/1/fd/1 2>/proc/1/fd/2

In order to have the correct environment variables when running python in cron context, first a script is created, it will later be used when starting the cron job.

start.sh:

#!/bin/bash
printenv | sed 's/^\(.*\)$/export \1/g' > /env.sh
chmod +x /env.sh
cron -f

Dockerfile:

FROM python:3.8-buster
RUN apt-get update && apt-get install -y -qq --no-install-recommends cron
COPY example-cron /etc/cron.d/example-cron
RUN chmod 0644 /etc/cron.d/example-cron
RUN crontab /etc/cron.d/example-cron
COPY *.py /
COPY setup.py /
COPY Pipfile /
COPY Pipfile.lock /
COPY start.sh /usr/bin

RUN chmod +x /usr/bin/start.sh
RUN pip install pipenv
RUN pipenv install
CMD ["start.sh"]

Build a docker image:

docker build -t df-python-example-job:0.0.1 .

Conclusion

I hope this post has given you some inputs to start implementing your own pipeline.