how to copy pandas dataframe to vertica with airflow?
when I run the following function:
def write_df_to_vertica(conn: Cursor, df: pd.DataFrame):
query = """COPY temp_data.test(coumn1, coumn2, coumn3) FROM STDIN DELIMITER ',' """
conn.copy(query, df.to_csv(index=False, header=False))
it works, and the data is written to vertica db.
However when I call this function from PythonOperator
in a AirFlow
dag, the data isn't written to the db.
Why? and how to fix it?
1 answer
-
answered 2022-05-04 09:59
Rayan Suneer
try this maybe?
`pandas_to_vertica(df, cursor=None, name: str = "", schema: str = "", dtype: dict = {}, parse_n_lines: int = 10000, temp_path: str = "", insert: bool = False,)`
this is the format for it i guess.
do you know?
how many words do you know
See also questions close to this topic
-
Python File Tagging System does not retrieve nested dictionaries in dictionary
I am building a file tagging system using Python. The idea is simple. Given a directory of files (and files within subdirectories), I want to filter them out using a filter input and tag those files with a word or a phrase.
If I got the following contents in my current directory:
data/ budget.xls world_building_budget.txt a.txt b.exe hello_world.dat world_builder.spec
and I execute the following command in the shell:
py -3 tag_tool.py -filter=world -tag="World-Building Tool"
My output will be:
These files were tagged with "World-Building Tool": data/ world_building_budget.txt hello_world.dat world_builder.spec
My current output isn't exactly like this but basically, I am converting all files and files within subdirectories into a single dictionary like this:
def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree
Right now, my dictionary looks like this:
key:''
.In the following function, I am turning the empty values
''
into empty lists (to hold my tags):def empty_str_to_list(d): for k,v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v)
When I run my entire code, this is my output:
hello_world.dat ['World-Building Tool'] world_builder.spec ['World-Building Tool']
But it does not see
data/world_building_budget.txt
. This is the full dictionary:{'data': {'world_building_budget.txt': []}, 'a.txt': [], 'hello_world.dat': [], 'b.exe': [], 'world_builder.spec': []}
This is my full code:
import os, argparse def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree def empty_str_to_list(d): for k, v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v) parser = argparse.ArgumentParser(description="Just an example", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--filter", action="store", help="keyword to filter files") parser.add_argument("--tag", action="store", help="a tag phrase to attach to a file") parser.add_argument("--get_tagged", action="store", help="retrieve files matching an existing tag") args = parser.parse_args() filter = args.filter tag = args.tag get_tagged = args.get_tagged current_dir = os.getcwd() files_dict = fs_tree_to_dict(current_dir) empty_str_to_list(files_dict) for k, v in files_dict.items(): if filter in k: if v == []: v.append(tag) print(k, v) elif isinstance(v, dict): empty_str_to_list(v) if get_tagged in v: print(k, v)
-
Actaully i am working on a project and in it, it is showing no module name pip_internal plz help me for the same. I am using pycharm(conda interpreter
File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code exec(code, run_globals) File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\Scripts\pip.exe\__main__.py", line 4, in <module> File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\site-packages\pip\_internal\__init__.py", line 4, in <module> from pip_internal.utils import _log
I am using pycharm with conda interpreter.
-
Looping the function if the input is not string
I'm new to python (first of all) I have a homework to do a function about checking if an item exists in a dictionary or not.
inventory = {"apple" : 50, "orange" : 50, "pineapple" : 70, "strawberry" : 30} def check_item(): x = input("Enter the fruit's name: ") if not x.isalpha(): print("Error! You need to type the name of the fruit") elif x in inventory: print("Fruit found:", x) print("Inventory available:", inventory[x],"KG") else: print("Fruit not found") check_item()
I want the function to loop again only if the input written is not string. I've tried to type return Under print("Error! You need to type the name of the fruit") but didn't work. Help
-
Any efficient way to compare two dataframes and append new entries in pandas?
I have new files which I want to add them to historical table, before that, I need to check new file with historical table by comparing its two column in particular, one is
state
and another one isdate
column. First, I need to checkmax (state, date)
, then check those entries withmax(state, date)
in historical table; if they are not historical table, then append them, otherwise do nothing. I tried to do this in pandas bygroup-by
on new file and historical table and do comparison, if any new entries from new file that not in historical data, then add them. Now I have issues to append new values to historical table correctly in pandas. Does anyone have quick thoughts?My current attempt:
import pandas as pd src_df=pd.read_csv("https://raw.githubusercontent.com/adamFlyn/test_rl/main/src_df.csv") hist_df=pd.read_csv("https://raw.githubusercontent.com/adamFlyn/test_rl/main/historical_df.csv") picked_rows = src_df.loc[src_df.groupby('state')['yyyy_mm'].idxmax()]
I want to check
picked_rows
inhist_df
where I need to check bystate
andyyyy_mm
columns, so only add entries frompicked_rows
wherestate
hasmax
value or recent dates. I created desired output below. I tried inner join orpandas.concat
but it is not giving me correct out. Does anyone have any ideas on this?Here is my desired output that I want to get:
import pandas as pd desired_output=pd.read_csv("https://raw.githubusercontent.com/adamFlyn/test_rl/main/output_df.csv")
-
How to bring data frame into single column from multiple columns in python
I have data format in these multiple columns. So I want to bring all 4 columns of data into a single column.
YEAR Month pcp1 pcp2 pcp3 pcp4 1984 1 0 0 0 0 1984 2 1.2 0 0 0 1984 3 0 0 0 0 1984 4 0 0 0 0 1984 5 0 0 0 0 1984 6 0 0 0 1.6 1984 7 3 3 9.2 3.2 1984 8 6.2 27.1 5.4 0 1984 9 0 0 0 0 1984 10 0 0 0 0 1984 11 0 0 0 0 1984 12 0 0 0 0
-
Exclude Japanese Stopwords from File
I am trying to remove Japanese stopwords from a text corpus from twitter. Unfortunately the frequently used nltk does not contain Japanese, so I had to figure out a different way.
This is my MWE:
import urllib from urllib.request import urlopen import MeCab import re # slothlib slothlib_path = "http://svn.sourceforge.jp/svnroot/slothlib/CSharp/Version1/SlothLib/NLP/Filter/StopWord/word/Japanese.txt" sloth_file = urllib.request.urlopen(slothlib_path) # stopwordsiso iso_path = "https://raw.githubusercontent.com/stopwords-iso/stopwords-ja/master/stopwords-ja.txt" iso_file = urllib.request.urlopen(iso_path) stopwords = [line.decode("utf-8").strip() for line in iso_file] stopwords = [ss for ss in stopwords if not ss==u''] stopwords = list(set(stopwords)) text = '日本語の自然言語処理は本当にしんどい、と彼は十回言った。' tagger = MeCab.Tagger("-Owakati") tok_text = tagger.parse(text) ws = re.compile(" ") words = [word for word in ws.split(tok_text)] if words[-1] == u"\n": words = words[:-1] ws = [w for w in words if w not in stopwords] print(words) print(ws)
Successfully Completed: It does give out the original tokenized text as well as the one without stopwords
['日本語', 'の', '自然', '言語', '処理', 'は', '本当に', 'しんどい', '、', 'と', '彼', 'は', '十', '回', '言っ', 'た', '。'] ['日本語', '自然', '言語', '処理', '本当に', 'しんどい', '、', '十', '回', '言っ', '。']
There is still 2 issues I am facing though:
a) Is it possible to have 2 stopword lists regarded? namely
iso_file
andsloth_file
? so if the word is either a stopword fromiso_file
orsloth_file
it will be removed? (I tried to use line 14 asstopwords = [line.decode("utf-8").strip() for line in zip('iso_file','sloth_file')]
but received an error as tuple attributes may not be decodedb) The ultimate goal would be to generate a new text file in which all stopwords are removed.
I had created this MWE
### first clean twitter csv import pandas as pd import re import emoji df = pd.read_csv("input.csv") def cleaner(tweet): tweet = re.sub(r"@[^\s]+","",tweet) #Remove @username tweet = re.sub(r"(?:\@|http?\://|https?\://|www)\S+|\\n","", tweet) #Remove http links & \n tweet = " ".join(tweet.split()) tweet = ''.join(c for c in tweet if c not in emoji.UNICODE_EMOJI) #Remove Emojis tweet = tweet.replace("#", "").replace("_", " ") #Remove hashtag sign but keep the text return tweet df['text'] = df['text'].map(lambda x: cleaner(x)) df['text'].to_csv(r'cleaned.txt', header=None, index=None, sep='\t', mode='a') ### remove stopwords import urllib from urllib.request import urlopen import MeCab import re # slothlib slothlib_path = "http://svn.sourceforge.jp/svnroot/slothlib/CSharp/Version1/SlothLib/NLP/Filter/StopWord/word/Japanese.txt" sloth_file = urllib.request.urlopen(slothlib_path) #stopwordsiso iso_path = "https://raw.githubusercontent.com/stopwords-iso/stopwords-ja/master/stopwords-ja.txt" iso_file = urllib.request.urlopen(iso_path) stopwords = [line.decode("utf-8").strip() for line in iso_file] stopwords = [ss for ss in stopwords if not ss==u''] stopwords = list(set(stopwords)) with open("cleaned.txt",encoding='utf8') as f: cleanedlist = f.readlines() cleanedlist = list(set(cleanedlist)) tagger = MeCab.Tagger("-Owakati") tok_text = tagger.parse(cleanedlist) ws = re.compile(" ") words = [word for word in ws.split(tok_text)] if words[-1] == u"\n": words = words[:-1] ws = [w for w in words if w not in stopwords] print(words) print(ws)
While it works for the simple input text in the first MWE, for the MWE I just stated I get the error
in method 'Tagger_parse', argument 2 of type 'char const *' Additional information: Wrong number or type of arguments for overloaded function 'Tagger_parse'. Possible C/C++ prototypes are: MeCab::Tagger::parse(MeCab::Lattice *) const MeCab::Tagger::parse(char const *)
for this line:
tok_text = tagger.parse(cleanedlist)
So I assume I will need to make amendments to thecleanedlist
?I have uploaded the cleaned.txt on github for reproducing the issue: [txt on github][1]
Also: How would I be able to get the tokenized list that excludes stopwords back to a text format like cleaned.txt? Would it be possible to for this purpose create a df of ws? Or might there even be a more simple way?
Sorry for the long request, I tried a lot and tried to make it as easy as possible to understand what I'm driving at :-)
Thank you very much! [1]: https://gist.github.com/yin-ori/1756f6236944e458fdbc4a4aa8f85a2c
-
How to get conf value from airflow dag?
I want to get conf value from dag area.
"{{ dag_run.conf['company'] }}"
is recognized as a string.How can I get this value?
Values are passed fine when calling other dags.
t_trigger = TriggerDagRunOperator( task_id="t-trigger", trigger_dag_id="example_dag", conf={ "company": "{{ dag_run.conf['company'] }}", }, )
However, in the dag area the value is recognized as a string.
t_task_a = PythonOperator( task_id="t-task-a", python_callable=task-a, ) employees = Variable.get( "{{ dag_run.conf['company'] }}", # problem default_var=['company'], deserialize_json=True ) for employee in employees: t_employee_operator = PythonOperator( task_id=f"t-test-operator", python_callable=employee_operator, op_kwargs={"employee": employee} ) t_task_a >> t_employee_operator
-
Connection failure when using airflow to run python script connecting to neo4j database
I'm trying to use airflow to orchestrate a workflow where a neo4j docker is run and then a python script to query data from the neo4j database (the code runs on AWS EC2 instance). I am able to run the neo4j docker successfully. But when I ran the task of querying the database, I got connection error as:
neo4j.exceptions.ServiceUnavailable: Connection to 127.0.0.1:7687 closed without handshake response
If I manually ran the python script on EC2, it can connect to the neo4j database without any issue. The code I am using is as below:
class Neo4jConnection: def __init__(self, uri, user, pwd): self.__uri = uri self.__user = user self.__pwd = pwd self.__driver = None try: self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd)) logger.info('SUCCESS: Connected to the Neo4j Database.') except Exception as e: logger.info('ERROR: Could not connect to the Neo4j Database. See console for details.') raise SystemExit(e) def close(self): if self.__driver is not None: self.__driver.close() def query(self, query, parameters=None, db=None): assert self.__driver is not None, "Driver not initialized!" session = None response = None try: session = self.__driver.session(database=db) if db is not None else self.__driver.session() response = list(session.run(query, parameters)) except Exception as e: logger.info("Query failed:", e) finally: if session is not None: session.close() return response class LoadWikiPathway2Neo4j: def __init__(self): # replace localhost with 127.0.0.1 self.connection = Neo4jConnection(uri="bolt://localhost:7687", user="neo4j", pwd="test") def loadData(self): WIKIPATHWAY_MOUNTED_DATA_VOLUME = "/home/ec2-user/wikipathway_neo4j/data/Human/" # the volume is mounted to neo4j docker WIKIPATHWAY_DATA_DOCKER_VOLUME = "file:///var/lib/neo4j/data/Human" # file path in neo4j docker # connect db graph = self.connection # only run once graph.query('''MATCH (n) DETACH DELETE n''') graph.query('''CALL n10s.graphconfig.init()''') graph.query('''CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS ON (r:Resource) ASSERT r.uri IS UNIQUE''') graph.query('''call n10s.nsprefixes.removeAll()''') cypher = '''WITH '@prefix biopax: <http://www.biopax.org/release/biopax-level3.owl#> . \ @prefix cito: <http://purl.org/spar/cito/> . \ @prefix dc: <http://purl.org/dc/elements/1.1/> . \ @prefix dcat: <http://www.w3.org/ns/dcat#> . \ @prefix dcterms: <http://purl.org/dc/terms/> . \ @prefix foaf: <http://xmlns.com/foaf/0.1/> . \ @prefix freq: <http://purl.org/cld/freq/> . \ @prefix gpml: <http://vocabularies.wikipathways.org/gpml#> . \ @prefix hmdb: <https://identifiers.org/hmdb/> . \ @prefix ncbigene: <https://identifiers.org/ncbigene/> .\ @prefix owl: <http://www.w3.org/2002/07/owl#> . \ @prefix pav: <http://purl.org/pav/> . \ @prefix prov: <http://www.w3.org/ns/prov#> . \ @prefix pubmed: <http://www.ncbi.nlm.nih.gov/pubmed/> .\ @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . \ @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . \ @prefix skos: <http://www.w3.org/2004/02/skos/core#> . \ @prefix void: <http://rdfs.org/ns/void#> . \ @prefix wp: <http://vocabularies.wikipathways.org/wp#> . \ @prefix wprdf: <http://rdf.wikipathways.org/> . \ @prefix xsd: <http://www.w3.org/2001/XMLSchema#> . ' as txt \ CALL n10s.nsprefixes.addFromText(txt) yield prefix, namespace RETURN prefix, namespace''' graph.query(cypher) fileLst = os.listdir(WIKIPATHWAY_MOUNTED_DATA_VOLUME) # load data for filename in fileLst: filePath = f'{WIKIPATHWAY_DATA_DOCKER_VOLUME}/{filename}' print(filePath) cypher = f'''CALL n10s.rdf.import.fetch("{filePath}","Turtle")''' logger.info(cypher) graph.query(cypher) logger.info(f"{filename} is loaded") graph.close() def load_wikipathway_files(): data = LoadWikiPathway2Neo4j() data.loadData() with DAG( default_args=default, dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval=None, catchup=False, start_date=days_ago(1), tags=['s3', 'download_wikipathway'], ) as dag: loadWikipathwayFile = PythonOperator( task_id="load_wikipathway", python_callable=load_wikipathway_files )
-
Dynamic TaskGroup in Airflow 2
I have a function
run_step
that produces a dynamic number of emr tasks within a task group. I want to keep this function in a separate file namedhelpers.py
so that other dags can use it and I don't have to rewrite it (in the examples below, I have hard-coded certain values for clarity, otherwise they would be variables):def run_step(my_group_id, config, dependencies): task_group = TaskGroup(group_id = my_group_id) for c in config: task_id = 'start_' + c['task_name'] task_name = c['task_name'] add_step = EmrAddStepsOperator( task_group=my_group_id, task_id=task_id, job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}", steps=create_emr_step(args=config[c], d=dependencies[c]), aws_conn_id='aws_default' ) wait_for_step = EmrStepSensor( task_group=my_group_id, task_id='wait_for_' + task_name + '_step', job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}", step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + f"{my_group_id}.{task_id}" + "', key='return_value')[0] }}" ) add_step >> wait_for_step return task_group
The code in
my_dag.py
which calls this function looks like:execute_my_step = create_emr_step( my_group_id = 'execute_steps' config = my_tasks, dependencies = my_dependencies ) some_steps >> execute_my_step
I am expecting this to produce a task group that contains two steps for every item in config, but it only produces one step labeled as
create_emr_step
with no task group. I tried putting the TaskGroup in the dag (and made the necessary changes torun_step
) as shown below, but that did not work either:with TaskGroup('execute_steps') as execute_steps: execute_my_step = create_emr_step( my_group_id = 'execute_steps' config = my_tasks, dependencies = my_dependencies )
Is it possible to do this? I need to produce steps dynamically because our pipeline is so big. I was doing this successfully with subdags in a similar way, but can't figure out how to get this to work with task groups. Would it be easier to write my own operator?
-
Vertica data into pySpark throws "Failed to find data source"
I have spark 3.2, vertica 9.2.
spark = SparkSession.builder.appName("Ukraine").master("local[*]")\ .config("spark.jars", '/home/shivamanand/spark-3.2.1-bin-hadoop3.2/jars/vertica-jdbc-9.2.1-0.jar')\ .config("spark.jars", '/home/shivamanand/spark-3.2.1-bin-hadoop3.2/jars/vertica-spark-3.2.1.jar')\ .getOrCreate() table = "test" db = "myDB" user = "myUser" password = "myPassword" host = "myVerticaHost" part = "12"; opt = {"host" : host, "table" : table, "db" : db, "numPartitions" : part, "user" : user, "password" : password} df = spark.read.format("com.vertica.spark.datasource.DefaultSource").options().load()
gives
Py4JJavaError: An error occurred while calling o77.load. : java.lang.ClassNotFoundException: Failed to find data source: com.vertica.spark.datasource.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html ~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception) ~/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(
Before this step, i have wget 2 jars into spark jars folder (the ones in sparksession config)
which i obtained from
https://libraries.io/maven/com.vertica.spark:vertica-spark https://www.vertica.com/download/vertica/client-drivers/
not sure what i'm doing wrong here, is there an alternative to the spark jars option?
In the below link -
they mention
Both of these libraries are installed with the Vertica server and are available on all nodes in the Vertica cluster in the following locations:
The Spark Connector files are located in /opt/vertica/packages/SparkConnector/lib. The JDBC client library is /opt/vertica/java/vertica-jdbc.jar
Should one try to replace local folder jars with these?
-
PostgreSQL -Assign Unique ID to each groups of records, based on a condition
I am trying to group records of a table, based on a condition, and then asing in a new column a simple integer ID for each group obtained from where the condition is met.
ID TMSTP CAT_TYPE TELEGRAMMZAEHLER 1 2022-05-03 20:52:02 32 5004 2 2022-05-03 20:51:34 32 5002 3 2022-05-03 20:51:34 32 5001 4 2022-05-03 20:51:33 32 5000 5 2022-05-03 20:41:22 32 4996 6 2022-05-03 20:41:21 32 4995 I need to assign the same ID to those rows whose TELEGRAMMZAEHLER number is consecutive to the next one (for example, rows 2 and 3 have TZ 5002 and 5001, therefore they are consecutive and should belong to a same Group ID.)
The GRUPPE column would be my desired outcome. Rows 2 to 4 belong together in the same group ID, bur then rows 5 and 6 should have another ID, because the TZ in row 5 is not consecutive from the TZ in row 4.
ID TMSTP CAT_TYPE TELEGRAMMZAEHLER GRUPPE 1 2022-05-03 20:52:02 32 5004 1 2 2022-05-03 20:51:34 32 5002 2 3 2022-05-03 20:51:34 32 5001 2 4 2022-05-03 20:51:33 32 5000 2 5 2022-05-03 20:41:22 32 4996 3 6 2022-05-03 20:41:21 32 4995 3 Any ideas on how can that be achieved on postgreSQL?
Thank you very much!
-
What is the equivalent function for Panda dataframe..info() in vDataFrame
Pandas
dataframe
has aninfo
function which shows information contained in thedataframe
https://www.w3schools.com/python/pandas/ref_df_info.asp#:~:text=The%20info()%20method%20prints,method%20actually%20prints%20the%20info.What is the equivalent function in
vdataframe
ofVerticaPy
? I trieddescribe
andmemory_usage
but I don't see how many values each column has for example. -
The amount of rows when converting pandas dataframe into a VDF look considerably less than individual pandas dataframe
I am working on
titanic
data set. I read thetrain.csv
andtest.csv
files using theread_csv
functionfrom verticapy import * #drop table if already exists drop('v_temp_schema.train', method='table') train_vdf = read_csv("train.csv") train_vdf from verticapy import * #drop table if already exists drop('v_temp_schema.test', method='table') test_vdf = read_csv("test.csv") test_vdf
Each
vdf
contains at least 100 rows.Then I combine the two
vdf
simport pandas as pd #combine is a list, but its elements are vDataFrame. concat requires the elements to be Series or DataFrame. So need to convert # vdf to panda data frames combine = [train_vdf.to_pandas(), test_vdf.to_pandas()] #gives a list combine_pdf = pd.concat(combine) combine_pdf
The output seems to have
1234 rows × 12 columns
amount of data.Now when I convert the
pandas
of compbined data back tovdf
, I seem to get very less no. of rows. I see only two rows in the output. Why?set_option("sql_on", False) combine_vdf = pandas_to_vertica(combine_pdf) combine_vdf