Google Dataflow / Apache Beam Python - Side-Input from PCollection kills performance
We are running logfile parsing jobs in google dataflow using the Python SDK. Data is spread over several 100s of daily logs, which we read via file-pattern from Cloud Storage. Data volume for all files is about 5-8 GB (gz files) with 50-80 million lines in total.
loglines = p | ReadFromText('gs://logfile-location/logs*-20180101')
In addition, we have a simple (small) mapping csv, that maps logfile-entries to human readable text. Has about 400 lines, 5 kb size.
For Example a logfile entry with [param=testing2] should be mapped to "Customer requested 14day free product trial" in the final output.
We do this in a simple beam.Map with sideinput, like so:
customerActions = loglines | beam.Map(map_logentries,mappingTable)
where map_logentries is the mapping function and mappingTable is said mapping table.
However, this only works if we read the mapping table in native python via open() / read(). If we do the same utilising the beam pipeline via ReadFromText() and pass the resulting PCollection as side-input to the Map, like so:
mappingTable = p | ReadFromText('gs://side-inputs/category-mapping.csv') customerActions = loglines | beam.Map(map_logentries,beam.pvalue.AsIter(mappingTable))
performance breaks down completely to about 2-3 items per Second.
Now, my questions:
- Why would performance break so badly, what is wrong with passing a PCollection as side-input?
- If it is maybe not recommended to use PCollections as side-input, how is one supposed to build such as pipeline that needs mappings that can/should not be hard coded into the mapping function?
For us, the mapping does change frequently and I need to find a way to have "normal" users provide it. The idea was to have the mapping csv available in Cloud Storage, and simply incorporate it into the Pipeline via ReadFromText(). Reading it locally involves providing the mapping to the workers, so only the tech-team can do this.
I am aware that there are caching issues with side-input, but surely this should not apply to a 5kb input.
All code above is pseudo code to explain the problem. Any ideas and thoughts on this would be highly appreciated!
The code looks fine. However, since
mappingTableis a mapping, wouldn't
beam.pvalue.AsDictbe more appropriate for your use case?
mappingTableis small enough so side input is a good use case here. Given that
mappingTableis also static, you can load it from GCS in
start_bundlefunction of your
DoFn. See the answer to this post for more details. If
mappingTablebecomes very large in future, you can also consider converting your
PCollectionof key-value pairs and join them using