Dagster: how to reexecute failed steps of a pipeline?
I created a test pipeline and it fails mid-way. I want to programmatically re-execute it but starting at the failed step of the pipeline and move forward. I do not want to repeat execution of the earlier, successful steps.
from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random
instance = DagsterInstance.ephemeral()
@solid
def step1(context, data):
return range(10), ('a' + i for i in range(10))
@solid
def step2(context, step1op):
x,y = step1op
# simulation of noise
xx = [el * (1 + 0.1 * random()) for el in x]
xx2 = [(el - 1)/el for el in xx]
return zip(xx, xx2), y
@solid
def step3(context, step2op):
x, y = step2op
...
return x, y
run_config = {...}
@pipeline
def inputs_pipeline():
step3(step2(step1()))
1 answer
-
answered 2020-11-23 14:21
sophros
Programmatical re-execution of part of the pipeline require identifying ID of a parent solid which is available:
parent_run_id = instance.get_runs()[0].run_id
Then reexution of the pipeline:
result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id, step_keys_to_execute=['step2.compute', 'step3.compute'], run_config=run_config, instance=instance)
See also questions close to this topic
-
Python tenacity retry multiple exceptions
Is there any way to pass multiple expectations to
tenacity
'sretry_if_exception_type
?I'd like to add
httpx.ReadTimeout
exception to the example below;from typing import List import httpx from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type(httpx.TimeoutExceotion) async def fetch(urls: List): client = httpx.AsyncClient(http2=True) for url in urls: await client.get(url)
-
Pandas: Combine rows in Dataframe column based on condition
I have the excerpt of the following
df
:Causa de muerte Sexo Edad Periodo Total 39 001-102 I-XXII.Todas las causas Total Menos de 1 año 2018 1027 40 001-102 I-XXII.Todas las causas Total Menos de 1 año 2017 1092 41 001-102 I-XXII.Todas las causas Total Menos de 1 año 2016 1120 78 001-102 I-XXII.Todas las causas Total De 1 a 4 años 2018 240 79 001-102 I-XXII.Todas las causas Total De 1 a 4 años 2017 226 80 001-102 I-XXII.Todas las causas Total De 1 a 4 años 2016 248
Is there a way to combine/merge the rows where 'Edad' == 'Menos de 1 año' and 'Edad' == 'De 1 a 4 años' and rename the merged rows to 'De 0 a 4 años'. My goal is that the corresponding numbers in 'Total' add up:
Causa de muerte Sexo Edad Periodo Total 39 001-102 I-XXII.Todas las causas Total De 0 a 4 años 2018 1267 40 001-102 I-XXII.Todas las causas Total De 0 a 4 años 2017 1318 41 001-102 I-XXII.Todas las causas Total De 0 a 4 años 2016 1368
I'm assuming you'll do it with groupby, but I haven't figured out how.
-
3D indoor scene reconstruction using point clouds
I need to build 3d model from RGB-D images, suggest some good library for indoor scene reconstruction and also steps involve in it.
-
Start uwsgi command failed with python error
When I start
uwsgi uwsgi.ini
command I have the error bellow, may be I missed some parametrers. What shall I do to solve it ? Thanks for your answer. Nilmoty... *** Operational MODE: preforking+threaded *** Traceback (most recent call last): File "app.py", line 3, in <module> from flask import Flask, url_for, request, jsonify, redirect File "/srv/domo/venv/lib/python3.8/site-packages/flask/__init__.py", line 16, in <module> from werkzeug.exceptions import abort File "/srv/domo/venv/lib/python3.8/site-packages/werkzeug/__init__.py", line 15, in <module> from .serving import run_simple File "/srv/domo/venv/lib/python3.8/site-packages/werkzeug/serving.py", line 58, in <module> from http.server import BaseHTTPRequestHandler File "/usr/local/lib/python3.8/http/server.py", line 628, in <module> class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): File "/usr/local/lib/python3.8/http/server.py", line 880, in SimpleHTTPRequestHandler mimetypes.init() # try to read system mime.types File "/usr/local/lib/python3.8/mimetypes.py", line 364, in init db.read(file) File "/usr/local/lib/python3.8/mimetypes.py", line 206, in read self.readfp(fp, strict) File "/usr/local/lib/python3.8/mimetypes.py", line 217, in readfp line = fp.readline() File "/usr/local/lib/python3.8/codecs.py", line 322, in decode (result, consumed) = self._buffer_decode(data, self.errors, final) UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 21: invalid start byte unable to load app 0 (mountpoint='') (callable not found or import error) *** no app loaded. going in full dynamic mode *** ...```
-
clear cache of cachetools
I have function
from cachetools import cached from cachetools import LRUCache @cached(cache=LRUCache(maxsize=256), key=lambda system, bus_map, get_hint=False, more_info=False: (hash_dict(bus_map), get_hint, more_info)) def calc_wait_time(system: System, bus_map: Dict[int, List[Tuple[int, int]]], get_hint: bool = False, more_info: bool = False):
(where System is Class I defiend and hash_dict convert to frozenset)
and I tried to use
calc_wait_time.cache_clear()
but its says'function' object has no attribute 'cache_clear'
I also tried
calc_wait_time.clear()
but it doesnt workwhat Im doing wrong?
-
jenkinsfile fail when I'm using docker in it
I'm using Jenkins 2.263.1 & I want to use docker with pipeline. My jenkinsfile is:
pipeline { agent { docker { image 'android-build:android-gradle' args '-v $PWD/Android:/app -w /app' } } stages { stage('Build') { steps { sh 'cd /app && ./gradlew build' } } } }
I get the following error:
Started by user bani ertebat mahan Obtained Jenkinsfile from git http://git.banicodevelopers.com/ci-cd/android-app.git Running in Durability level: MAX_SURVIVABILITY [Pipeline] Start of Pipeline [Pipeline] node Running on Jenkins in /var/lib/jenkins/workspace/Android-App [Pipeline] { [Pipeline] stage [Pipeline] { (Declarative: Checkout SCM) [Pipeline] checkout The recommended git tool is: NONE using credential cd5d4a9e-e5e5-4808-b1a4-dcac90da701f > git rev-parse --is-inside-work-tree # timeout=10 Fetching changes from the remote Git repository > git config remote.origin.url http://git.banicodevelopers.com/ci-cd/android-app.git # timeout=10 Fetching upstream changes from http://git.banicodevelopers.com/ci-cd/android-app.git > git --version # timeout=10 > git --version # 'git version 1.8.3.1' using GIT_ASKPASS to set credentials > git fetch --tags --progress http://git.banicodevelopers.com/ci-cd/android-app.git +refs/heads/*:refs/remotes/origin/* # timeout=10 > git rev-parse refs/remotes/origin/master^{commit} # timeout=10 Checking out Revision d02dcbf66909c49106ea102d94c42df03d424cc4 (refs/remotes/origin/master) > git config core.sparsecheckout # timeout=10 > git checkout -f d02dcbf66909c49106ea102d94c42df03d424cc4 # timeout=10 Commit message: "remove --rm options" > git rev-list --no-walk 64ad06f998001cb9cbedb8b88df32cfe8a86b679 # timeout=10 [Pipeline] } [Pipeline] // stage [Pipeline] withEnv [Pipeline] { [Pipeline] isUnix [Pipeline] sh + docker inspect -f . android-build:android-gradle . [Pipeline] withDockerContainer Jenkins does not seem to be running inside a container $ docker run -t -d -u 998:996 -v $PWD/Android:/app -w /app -w /var/lib/jenkins/workspace/Android-App -v /var/lib/jenkins/workspace/Android-App:/var/lib/jenkins/workspace/Android-App:rw,z -v /var/lib/jenkins/workspace/Android-App@tmp:/var/lib/jenkins/workspace/Android-App@tmp:rw,z -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** -e ******** android-build:android-gradle cat $ docker top c8729930a9b86c9ea66bfd674e233c75778d61376edfb7278d930acc6401595a -eo pid,comm [Pipeline] { [Pipeline] stage [Pipeline] { (Build) [Pipeline] sh + ./gradlew build /var/lib/jenkins/workspace/Android-App@tmp/durable-83d3c6ea/script.sh: 1: /var/lib/jenkins/workspace/Android-App@tmp/durable-83d3c6ea/script.sh: ./gradlew: not found [Pipeline] } [Pipeline] // stage [Pipeline] } $ docker stop --time=1 c8729930a9b86c9ea66bfd674e233c75778d61376edfb7278d930acc6401595a $ docker rm -f c8729930a9b86c9ea66bfd674e233c75778d61376edfb7278d930acc6401595a [Pipeline] // withDockerContainer [Pipeline] } [Pipeline] // withEnv [Pipeline] } [Pipeline] // node [Pipeline] End of Pipeline ERROR: script returned exit code 127 Finished: FAILURE
Despite me never defining -w,-d,-u,-t options in my jenkisfile, they are being defined.
I mean just define one -w option but, it also define for
/var/lib/jenkins/workspace/
I have added jenkins user to docker group.
Any help appreciated.
-
How to conditionally add a pipeline element in bash
I need to form a pipeline of various commands. Some elements of the pipeline, or sequences of elements, are only relevant when some condition holds. Now, I could write:
if [[ $whatever ]]; then cmd1 | cmd2 | cmd3 | cmd4 else cmd1 | cmd4 fi
but that means repeating
cmd1
andcmd4
, plus, there may be several conditions and I don't want to write nested if's. So, I tried writing this:if [[ $whatever ]]; then pipeline_segment="| cmd2 | cmd3" else pipeline_segment="" fi cmd1 ${pipeline_segment} | cmd4
but - the pipe symbol was not interpreted as an instruction to use a pipe.
How do I have bash execute the pipeline I want it too?
Note: You may assume a bash version of 4 or higher, but only if you must.
- Azure DevOps Server 2019 - Create pipeline - No GitHub yaml option in Where is Your Code
-
Does dagster support dependencies between solids with no outputs?
I am building a prototype pipeline that does two things:
- (solid) Clears files out of an existing directory
- (solid) Runs a batch process to dump data into that directory.
Step #1 is all side-effect, and has no output to pass to #2. Is it possible to express a dependency between these two solids in a pipeline?
-
Compose a pipeline with other pipelines
I am currently trying to choose between numerous workflow frameworks. I need an important feature which is workflow composition.
I found nothing on the documentation even in the API reference or advanced tutorial.
So my question is : Is it possible to compose pipelines ? i.e. To build some DAGs with already written ones. There is maybe some workarounds but I am interested by its native integration.
Thanks
-
Executing a solid when at least one of the required inputs is given
As an input, I would like to retrieve data based on user input, or "randomly" from a DB if no user input is given. All other downstream tasks of the pipeline would be the same.
Therefore, I would like to create a pipeline starting with solids A and B, and a downstream solid C executed based on input from solid A OR solid B.
However, when using conditional outputs on solids A and B, solid C is not executed, as one input is not generated by upstream solids.
Is there a simple way of doing this that I am missing out?
Thanks for your help.