How to read bigQuery from PCollection in Dataflow

I Have a PCollection of Object that I get from pubsub, let say :

 PCollection<Student> pStudent ;

and in student attributes, there is an attribute let say studentID; and I want to read attributes (class_code) from BigQuery with this student id and set the class_code that I get from BQ to student Object in PCollcetion

is there anyone know how to implement this? I know that in beam there is a BigQueryIO but how can I do that, if the query string criteria that I want to execute in BQ is from student object (studentID) in PCollection and How can I set the value to PCollection from the result of BigQuery?

1 answer

  • answered 2018-11-08 14:36 Guillem Xercavins

    I considered two options to do this. One would be using BigQueryIO to read the whole table and materialize it as a side input or use CoGroupByKey to join all the data. Another possibility, the one I implemented herein, is to use the Java Client Library directly.

    I created some dummy data with:

    $ bq mk test.students name:STRING,grade:STRING
    $ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'
    

    which looks like this:

    enter image description here

    and then, within the pipeline, I generate some input dummy data:

    Create.of("Luke", "Leia", "Yoda", "Chewbacca")
    

    For each one of these "students" I fetch the corresponding grade in the BigQuery table following the approach in this example. Take into account, depending on your data volume, rate (quotas) and cost considerations as per the previous comment. Full example:

    public class DynamicQueries {
    
        private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);
    
        @SuppressWarnings("serial")
        public static void main(String[] args) {
            PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
            Pipeline p = Pipeline.create(options);
    
            // create input dummy data     
            PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));
    
            // ParDo to map each student with the grade in BigQuery
            PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
    
                    QueryJobConfiguration queryConfig =
                        QueryJobConfiguration.newBuilder(
                          "SELECT name, grade "
                              + "FROM `PROJECT_ID.test.students` "
                              + "WHERE name = "
                              + "\"" + c.element() + "\" "  // fetch the appropriate student
                              + "LIMIT 1")
                            .setUseLegacySql(false) // Use standard SQL syntax for queries.
                            .build();
    
                    // Create a job ID so that we can safely retry.
                    JobId jobId = JobId.of(UUID.randomUUID().toString());
                    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
    
                    // Wait for the query to complete.
                    queryJob = queryJob.waitFor();
    
                    // Check for errors
                    if (queryJob == null) {
                      throw new RuntimeException("Job no longer exists");
                    } else if (queryJob.getStatus().getError() != null) {
                      throw new RuntimeException(queryJob.getStatus().getError().toString());
                    }
    
                    // Get the results.
                    QueryResponse response = bigquery.getQueryResults(jobId)
                    TableResult result = queryJob.getQueryResults();
    
                    String mark = new String();
    
                    for (FieldValueList row : result.iterateAll()) {
                        mark = row.get("grade").getStringValue();
                    }
    
                    c.output(KV.of(c.element(), mark));
                }
            }));
    
            // log to check everything is right
            marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
                    c.output(c.element());
                }
            }));
    
            p.run();
        }
    }
    

    And the output is:

    Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
    INFO: Element: Yoda A+
    Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
    INFO: Element: Luke C-
    Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
    INFO: Element: Chewbacca F
    Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
    INFO: Element: Leia B+
    

    (Tested with BigQuery 1.22.0 and 2.5.0 Java SDK for Dataflow)