Jinja template in Airflow always change value to String
I have a custom defined Operator my_previous_job
in Airflow that returns a Python list.
In my DAG definition, I reference it using jinja template:
t_my_job = MyOperator(
data=json.dumps({
"jobId": f"{{{{ ti.xcom_pull(task_ids='my_previous_job', "f"key='return_value')}}}}",
})
)
However, the value the f"{{{{ (ti.xcom_pull(task_ids='my_previous_job', "f"key='return_value')}}}})"
returns is not a list, but a string with a list in it. i.e.
Instead of:
['a','b','c']
it returns a string:
"['a','b','c']"
What can I do to just get the list generated by the my_previous_job
instead of a String?
1 answer
-
answered 2022-01-21 19:49
Elad Kalif
For Airflow < 2.1.0: Rendering templated fields is always string.
For Airflow >= 2.1.0: There is support for rendering fields as native Python Objects. You will need to set
render_template_as_native_obj=True
in the DAG object. You can read more about it in the docs.
do you know?
how many words do you know
See also questions close to this topic
-
Python File Tagging System does not retrieve nested dictionaries in dictionary
I am building a file tagging system using Python. The idea is simple. Given a directory of files (and files within subdirectories), I want to filter them out using a filter input and tag those files with a word or a phrase.
If I got the following contents in my current directory:
data/ budget.xls world_building_budget.txt a.txt b.exe hello_world.dat world_builder.spec
and I execute the following command in the shell:
py -3 tag_tool.py -filter=world -tag="World-Building Tool"
My output will be:
These files were tagged with "World-Building Tool": data/ world_building_budget.txt hello_world.dat world_builder.spec
My current output isn't exactly like this but basically, I am converting all files and files within subdirectories into a single dictionary like this:
def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree
Right now, my dictionary looks like this:
key:''
.In the following function, I am turning the empty values
''
into empty lists (to hold my tags):def empty_str_to_list(d): for k,v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v)
When I run my entire code, this is my output:
hello_world.dat ['World-Building Tool'] world_builder.spec ['World-Building Tool']
But it does not see
data/world_building_budget.txt
. This is the full dictionary:{'data': {'world_building_budget.txt': []}, 'a.txt': [], 'hello_world.dat': [], 'b.exe': [], 'world_builder.spec': []}
This is my full code:
import os, argparse def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree def empty_str_to_list(d): for k, v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v) parser = argparse.ArgumentParser(description="Just an example", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--filter", action="store", help="keyword to filter files") parser.add_argument("--tag", action="store", help="a tag phrase to attach to a file") parser.add_argument("--get_tagged", action="store", help="retrieve files matching an existing tag") args = parser.parse_args() filter = args.filter tag = args.tag get_tagged = args.get_tagged current_dir = os.getcwd() files_dict = fs_tree_to_dict(current_dir) empty_str_to_list(files_dict) for k, v in files_dict.items(): if filter in k: if v == []: v.append(tag) print(k, v) elif isinstance(v, dict): empty_str_to_list(v) if get_tagged in v: print(k, v)
-
Actaully i am working on a project and in it, it is showing no module name pip_internal plz help me for the same. I am using pycharm(conda interpreter
File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code exec(code, run_globals) File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\Scripts\pip.exe\__main__.py", line 4, in <module> File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\site-packages\pip\_internal\__init__.py", line 4, in <module> from pip_internal.utils import _log
I am using pycharm with conda interpreter.
-
Looping the function if the input is not string
I'm new to python (first of all) I have a homework to do a function about checking if an item exists in a dictionary or not.
inventory = {"apple" : 50, "orange" : 50, "pineapple" : 70, "strawberry" : 30} def check_item(): x = input("Enter the fruit's name: ") if not x.isalpha(): print("Error! You need to type the name of the fruit") elif x in inventory: print("Fruit found:", x) print("Inventory available:", inventory[x],"KG") else: print("Fruit not found") check_item()
I want the function to loop again only if the input written is not string. I've tried to type return Under print("Error! You need to type the name of the fruit") but didn't work. Help
-
How to get conf value from airflow dag?
I want to get conf value from dag area.
"{{ dag_run.conf['company'] }}"
is recognized as a string.How can I get this value?
Values are passed fine when calling other dags.
t_trigger = TriggerDagRunOperator( task_id="t-trigger", trigger_dag_id="example_dag", conf={ "company": "{{ dag_run.conf['company'] }}", }, )
However, in the dag area the value is recognized as a string.
t_task_a = PythonOperator( task_id="t-task-a", python_callable=task-a, ) employees = Variable.get( "{{ dag_run.conf['company'] }}", # problem default_var=['company'], deserialize_json=True ) for employee in employees: t_employee_operator = PythonOperator( task_id=f"t-test-operator", python_callable=employee_operator, op_kwargs={"employee": employee} ) t_task_a >> t_employee_operator
-
How can I alternate the elements of multiple lists in Ansible?
I have multiple lists as input (all lists have the same length, but the input can have more than 3 lists). I want to create a list which is a sum of all input lists alternating their elements.
For example, given the following input:
data: - ['1','2','3','4','5'] - ['6','7','8','9','10'] - ['11','12','13','14','15']
I'm expecting the following output:
lst: [['1','6','11'],['2','7','12'],['3','8','13'],['4','9','14'],['5','10','15']]
This is what I've tried:
--- - name: zip more than 3 lists with loop hosts: localhost tasks: - name: Set facts set_fact: list: - ['1','2','3','4','5'] - ['6','7','8','9','10'] - ['11','12','13','14','15'] - name: zip to make pairs of both lists set_fact: lst: "{{ list[0] | zip(list[1]) | zip(list[2]) | list }}" - name: Debug ['1','6','11'],['2','7','13'],... debug: msg: "{{ item | flatten }}" loop: "{{ lst }}" - name: zip to make pairs of both lists set_fact: lst2: "{{ lst2 | default([]) | zip(ansible_loop.nextitem) | list }}" loop: "{{ list }}" loop_control: extended: yes - name: Debug debug: msg: "{{ lst2 }}"
The first
set_fact
outputs loop elements butlst
doesn't include the actual output I expect. And the limitation of the firstset_fact
is that I can't iterate in the loop due tozip
filter. I don't know how to acheive my goal. -
How to prevent Flask-WTF forms from closing modal when submitting invalid input?
I am trying to build a website where you can upload a video (this already works). But when submitting a wrong file format, the Flask-WTF form closes the modal. I want it to stay open.
(I am trying it first with an image PNG)
This is the form:
class VideoUploadForm(FlaskForm): video = FileField('Upload video', validators=[FileAllowed(['png']), FileRequired()]) submit = SubmitField('Upload')
This is the route:
@main.route("/video", methods=['GET', 'POST']) @login_required def video(): form = VideoUploadForm() if form.validate_on_submit(): f = form.video.data filename = secure_filename(f.filename) f.save(os.path.join( os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)), 'static\\videos', filename )) flash('Video has been succesfully uploaded', 'success') return redirect(url_for('main.video')) return render_template('video.html', title='Upload video', form=form)
This is the modal:
<div class="modal fade mt-5" id="exampleModal" tabindex="-1" role="dialog" aria-labelledby="exampleModalLabel" aria-hidden="true"> <div class="modal-dialog" role="document"> <div class="modal-content"> <div class="modal-header"> <h5 class="modal-title" id="exampleModalLabel">Upload video</h5> <button type="button" class="close" data-dismiss="modal" aria-label="Close"> <span aria-hidden="true">×</span> </button> </div> <form method="POST" action="" enctype="multipart/form-data"> {{ form.hidden_tag() }} <div class="modal-body"> {% if form.video.errors %} {{ form.video(class="form-control form-control-lg is-invalid") }} <p class="mt-1 ml-1"><small>Allowed formats: mov, mp4</small></p> <div class="invalid-feedback"> {% for error in form.video.errors %} <span>{{ error }}</span> {% endfor %} </div> {% else %} {{ form.video }} <p class="mt-1 ml-1"><small>Allowed formats: mov, mp4</small></p> {% endif %} </div> <div class="modal-footer"> <button type="button" class="btn btn-secondary" data-dismiss="modal">Close</button> {{ form.submit(class="btn btn-primary") }} </div> </form> </div> </div> </div>
How to prevent the modal from closing when I try to upload a jpg file for example? For now it will close the modal and if opened again it shows the error message. But i want it to stay open so that you immediately can see the error message.
-
Connection failure when using airflow to run python script connecting to neo4j database
I'm trying to use airflow to orchestrate a workflow where a neo4j docker is run and then a python script to query data from the neo4j database (the code runs on AWS EC2 instance). I am able to run the neo4j docker successfully. But when I ran the task of querying the database, I got connection error as:
neo4j.exceptions.ServiceUnavailable: Connection to 127.0.0.1:7687 closed without handshake response
If I manually ran the python script on EC2, it can connect to the neo4j database without any issue. The code I am using is as below:
class Neo4jConnection: def __init__(self, uri, user, pwd): self.__uri = uri self.__user = user self.__pwd = pwd self.__driver = None try: self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd)) logger.info('SUCCESS: Connected to the Neo4j Database.') except Exception as e: logger.info('ERROR: Could not connect to the Neo4j Database. See console for details.') raise SystemExit(e) def close(self): if self.__driver is not None: self.__driver.close() def query(self, query, parameters=None, db=None): assert self.__driver is not None, "Driver not initialized!" session = None response = None try: session = self.__driver.session(database=db) if db is not None else self.__driver.session() response = list(session.run(query, parameters)) except Exception as e: logger.info("Query failed:", e) finally: if session is not None: session.close() return response class LoadWikiPathway2Neo4j: def __init__(self): # replace localhost with 127.0.0.1 self.connection = Neo4jConnection(uri="bolt://localhost:7687", user="neo4j", pwd="test") def loadData(self): WIKIPATHWAY_MOUNTED_DATA_VOLUME = "/home/ec2-user/wikipathway_neo4j/data/Human/" # the volume is mounted to neo4j docker WIKIPATHWAY_DATA_DOCKER_VOLUME = "file:///var/lib/neo4j/data/Human" # file path in neo4j docker # connect db graph = self.connection # only run once graph.query('''MATCH (n) DETACH DELETE n''') graph.query('''CALL n10s.graphconfig.init()''') graph.query('''CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS ON (r:Resource) ASSERT r.uri IS UNIQUE''') graph.query('''call n10s.nsprefixes.removeAll()''') cypher = '''WITH '@prefix biopax: <http://www.biopax.org/release/biopax-level3.owl#> . \ @prefix cito: <http://purl.org/spar/cito/> . \ @prefix dc: <http://purl.org/dc/elements/1.1/> . \ @prefix dcat: <http://www.w3.org/ns/dcat#> . \ @prefix dcterms: <http://purl.org/dc/terms/> . \ @prefix foaf: <http://xmlns.com/foaf/0.1/> . \ @prefix freq: <http://purl.org/cld/freq/> . \ @prefix gpml: <http://vocabularies.wikipathways.org/gpml#> . \ @prefix hmdb: <https://identifiers.org/hmdb/> . \ @prefix ncbigene: <https://identifiers.org/ncbigene/> .\ @prefix owl: <http://www.w3.org/2002/07/owl#> . \ @prefix pav: <http://purl.org/pav/> . \ @prefix prov: <http://www.w3.org/ns/prov#> . \ @prefix pubmed: <http://www.ncbi.nlm.nih.gov/pubmed/> .\ @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . \ @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . \ @prefix skos: <http://www.w3.org/2004/02/skos/core#> . \ @prefix void: <http://rdfs.org/ns/void#> . \ @prefix wp: <http://vocabularies.wikipathways.org/wp#> . \ @prefix wprdf: <http://rdf.wikipathways.org/> . \ @prefix xsd: <http://www.w3.org/2001/XMLSchema#> . ' as txt \ CALL n10s.nsprefixes.addFromText(txt) yield prefix, namespace RETURN prefix, namespace''' graph.query(cypher) fileLst = os.listdir(WIKIPATHWAY_MOUNTED_DATA_VOLUME) # load data for filename in fileLst: filePath = f'{WIKIPATHWAY_DATA_DOCKER_VOLUME}/{filename}' print(filePath) cypher = f'''CALL n10s.rdf.import.fetch("{filePath}","Turtle")''' logger.info(cypher) graph.query(cypher) logger.info(f"{filename} is loaded") graph.close() def load_wikipathway_files(): data = LoadWikiPathway2Neo4j() data.loadData() with DAG( default_args=default, dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval=None, catchup=False, start_date=days_ago(1), tags=['s3', 'download_wikipathway'], ) as dag: loadWikipathwayFile = PythonOperator( task_id="load_wikipathway", python_callable=load_wikipathway_files )
-
Dynamic TaskGroup in Airflow 2
I have a function
run_step
that produces a dynamic number of emr tasks within a task group. I want to keep this function in a separate file namedhelpers.py
so that other dags can use it and I don't have to rewrite it (in the examples below, I have hard-coded certain values for clarity, otherwise they would be variables):def run_step(my_group_id, config, dependencies): task_group = TaskGroup(group_id = my_group_id) for c in config: task_id = 'start_' + c['task_name'] task_name = c['task_name'] add_step = EmrAddStepsOperator( task_group=my_group_id, task_id=task_id, job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}", steps=create_emr_step(args=config[c], d=dependencies[c]), aws_conn_id='aws_default' ) wait_for_step = EmrStepSensor( task_group=my_group_id, task_id='wait_for_' + task_name + '_step', job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}", step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + f"{my_group_id}.{task_id}" + "', key='return_value')[0] }}" ) add_step >> wait_for_step return task_group
The code in
my_dag.py
which calls this function looks like:execute_my_step = create_emr_step( my_group_id = 'execute_steps' config = my_tasks, dependencies = my_dependencies ) some_steps >> execute_my_step
I am expecting this to produce a task group that contains two steps for every item in config, but it only produces one step labeled as
create_emr_step
with no task group. I tried putting the TaskGroup in the dag (and made the necessary changes torun_step
) as shown below, but that did not work either:with TaskGroup('execute_steps') as execute_steps: execute_my_step = create_emr_step( my_group_id = 'execute_steps' config = my_tasks, dependencies = my_dependencies )
Is it possible to do this? I need to produce steps dynamically because our pipeline is so big. I was doing this successfully with subdags in a similar way, but can't figure out how to get this to work with task groups. Would it be easier to write my own operator?