How to run BigQuery after Dataflow job completed successfully

I am trying to run a query in BigQuery right after a dataflow job completes successfully. I have defined 3 different functions in main.py.

The first one is for running the dataflow job. The second one checks the dataflow jobs status. And the last one runs the query in BigQuery.

The trouble is the second function checks the dataflow job status multiple times for a period of time and after the dataflow job completes successfully, it does not stop checking the status. And then function deployment fails due to 'function load attempt timed out' error.

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import os
import re
import config
from google.cloud import bigquery
import time

global flag

def trigger_job(gcs_path, body):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
    request = service.projects().templates().launch(projectId=config.project_id, gcsPath=gcs_path, body=body)
    response = request.execute()

def get_job_status(location, flag):
    credentials=GoogleCredentials.get_application_default()
    dataflow=build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
    result=dataflow.projects().jobs().list(projectId=config.project_id, location=location).execute()

    for job in result['jobs']:
        if re.findall(r'' + re.escape(config.job_name) + '', job['name']):
            while flag==0:
                if job['currentState'] != "JOB_STATE_DONE":
                    print('NOT DONE')
                else:
                    flag=1
                    print('DONE')
                
                    break

def bq(sql):
    client = bigquery.Client()
    query_job = client.query(sql, location='US')

gcs_path = config.gcs_path
body=config.body
trigger_job(gcs_path,body)
flag=0
location='us-central1'
get_job_status(location,flag)
sql= """CREATE OR REPLACE TABLE 'table' AS SELECT * FROM 'table'"""
bq(SQL)

Cloud Function timeout is set to 540 seconds but deployment fails in 3-4 minutes.

Any help is very appreciated.

1 answer

  • answered 2021-06-18 21:29 Seng Cheong

    It appears from the code snippet provided that your HTTP-triggered cloud function is not returning a HTTP response.

    All HTTP-based cloud functions must return a HTTP response for proper termination. From the google documentation Ensure HTTP functions send an HTTP response (Emphasis - mine):

    If your function is HTTP-triggered, remember to send an HTTP response, as shown below. Failing to do so can result in your function executing until timeout. If this occurs, you will be charged for the entire timeout time. Timeouts may also cause unpredictable behavior or cold starts on subsequent invocations, resulting in unpredictable behavior or additional latency.

    Thus, you must have a function that in your main.py that returns some sort of value, ideally a value that can be coerced into a Flask http response.