Dynamic TaskGroup in Airflow 2

I have a function run_step that produces a dynamic number of emr tasks within a task group. I want to keep this function in a separate file named helpers.py so that other dags can use it and I don't have to rewrite it (in the examples below, I have hard-coded certain values for clarity, otherwise they would be variables):

def run_step(my_group_id, config, dependencies):
  task_group = TaskGroup(group_id = my_group_id)

    for c in config:
      task_id = 'start_' + c['task_name']
      task_name = c['task_name']

      add_step = EmrAddStepsOperator(
          task_group=my_group_id,
          task_id=task_id,
          job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
          steps=create_emr_step(args=config[c], d=dependencies[c]),
          aws_conn_id='aws_default'
      )

      wait_for_step = EmrStepSensor(
          task_group=my_group_id,
          task_id='wait_for_' + task_name + '_step',
          job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
          step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + f"{my_group_id}.{task_id}" + "', key='return_value')[0] }}"
      )

      add_step >> wait_for_step

  return task_group

The code in my_dag.py which calls this function looks like:

execute_my_step = create_emr_step(
    my_group_id = 'execute_steps'
    config = my_tasks,
    dependencies = my_dependencies
)

some_steps >> execute_my_step

I am expecting this to produce a task group that contains two steps for every item in config, but it only produces one step labeled as create_emr_step with no task group. I tried putting the TaskGroup in the dag (and made the necessary changes to run_step) as shown below, but that did not work either:

with TaskGroup('execute_steps') as execute_steps: 
  execute_my_step = create_emr_step(
      my_group_id = 'execute_steps'
      config = my_tasks,
      dependencies = my_dependencies
  )

Is it possible to do this? I need to produce steps dynamically because our pipeline is so big. I was doing this successfully with subdags in a similar way, but can't figure out how to get this to work with task groups. Would it be easier to write my own operator?

How many English words
do you know?
Test your English vocabulary size, and measure
how many words do you know
Online Test
Powered by Examplum