Dagster: how to reexecute failed steps of a pipeline?

I created a test pipeline and it fails mid-way. I want to programmatically re-execute it but starting at the failed step of the pipeline and move forward. I do not want to repeat execution of the earlier, successful steps.

from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random

instance = DagsterInstance.ephemeral()

def step1(context, data):
   return range(10), ('a' + i for i in range(10))

def step2(context, step1op):
  x,y = step1op

  # simulation of noise
  xx = [el * (1 + 0.1 * random()) for el in x]
  xx2 = [(el - 1)/el for el in xx]
  return zip(xx, xx2), y

def step3(context, step2op):
   x, y = step2op

   return x, y

run_config = {...}

def inputs_pipeline():

1 answer

  • answered 2020-11-23 14:21 sophros

    Programmatical re-execution of part of the pipeline require identifying ID of a parent solid which is available:

    parent_run_id = instance.get_runs()[0].run_id

    Then reexution of the pipeline:

    result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id,
                                step_keys_to_execute=['step2.compute', 'step3.compute'],
                                run_config=run_config, instance=instance)