Concatenation of unique values into a spark dataframe
I have two spark dataframes with different values that I would like to concatenate:
df:
c1 c2
A D
B E
B F
df2:
A B
key1 4
key2 5
key3 6
I would like to concatenate the unique values for certain columns in these dataframes into a single dataframe. Thus, the output would be
res:
values origin
A first
B first
key1 second
key2 second
key3 second
1 answer
-
answered 2022-01-19 17:34
blackbishop
Simple
union
should do the job:import pyspark.sql.functions as F df1 = df1.selectExpr("c1 as value").distinct().withColumn("origin", F.lit("first")) df2 = df2.selectExpr("A as value").distinct().withColumn("origin", F.lit("second")) res = df1.union(df2)
do you know?
how many words do you know
See also questions close to this topic
-
Python File Tagging System does not retrieve nested dictionaries in dictionary
I am building a file tagging system using Python. The idea is simple. Given a directory of files (and files within subdirectories), I want to filter them out using a filter input and tag those files with a word or a phrase.
If I got the following contents in my current directory:
data/ budget.xls world_building_budget.txt a.txt b.exe hello_world.dat world_builder.spec
and I execute the following command in the shell:
py -3 tag_tool.py -filter=world -tag="World-Building Tool"
My output will be:
These files were tagged with "World-Building Tool": data/ world_building_budget.txt hello_world.dat world_builder.spec
My current output isn't exactly like this but basically, I am converting all files and files within subdirectories into a single dictionary like this:
def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree
Right now, my dictionary looks like this:
key:''
.In the following function, I am turning the empty values
''
into empty lists (to hold my tags):def empty_str_to_list(d): for k,v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v)
When I run my entire code, this is my output:
hello_world.dat ['World-Building Tool'] world_builder.spec ['World-Building Tool']
But it does not see
data/world_building_budget.txt
. This is the full dictionary:{'data': {'world_building_budget.txt': []}, 'a.txt': [], 'hello_world.dat': [], 'b.exe': [], 'world_builder.spec': []}
This is my full code:
import os, argparse def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree def empty_str_to_list(d): for k, v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v) parser = argparse.ArgumentParser(description="Just an example", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--filter", action="store", help="keyword to filter files") parser.add_argument("--tag", action="store", help="a tag phrase to attach to a file") parser.add_argument("--get_tagged", action="store", help="retrieve files matching an existing tag") args = parser.parse_args() filter = args.filter tag = args.tag get_tagged = args.get_tagged current_dir = os.getcwd() files_dict = fs_tree_to_dict(current_dir) empty_str_to_list(files_dict) for k, v in files_dict.items(): if filter in k: if v == []: v.append(tag) print(k, v) elif isinstance(v, dict): empty_str_to_list(v) if get_tagged in v: print(k, v)
-
Actaully i am working on a project and in it, it is showing no module name pip_internal plz help me for the same. I am using pycharm(conda interpreter
File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code exec(code, run_globals) File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\Scripts\pip.exe\__main__.py", line 4, in <module> File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\site-packages\pip\_internal\__init__.py", line 4, in <module> from pip_internal.utils import _log
I am using pycharm with conda interpreter.
-
Looping the function if the input is not string
I'm new to python (first of all) I have a homework to do a function about checking if an item exists in a dictionary or not.
inventory = {"apple" : 50, "orange" : 50, "pineapple" : 70, "strawberry" : 30} def check_item(): x = input("Enter the fruit's name: ") if not x.isalpha(): print("Error! You need to type the name of the fruit") elif x in inventory: print("Fruit found:", x) print("Inventory available:", inventory[x],"KG") else: print("Fruit not found") check_item()
I want the function to loop again only if the input written is not string. I've tried to type return Under print("Error! You need to type the name of the fruit") but didn't work. Help
-
How to bring data frame into single column from multiple columns in python
I have data format in these multiple columns. So I want to bring all 4 columns of data into a single column.
YEAR Month pcp1 pcp2 pcp3 pcp4 1984 1 0 0 0 0 1984 2 1.2 0 0 0 1984 3 0 0 0 0 1984 4 0 0 0 0 1984 5 0 0 0 0 1984 6 0 0 0 1.6 1984 7 3 3 9.2 3.2 1984 8 6.2 27.1 5.4 0 1984 9 0 0 0 0 1984 10 0 0 0 0 1984 11 0 0 0 0 1984 12 0 0 0 0
-
separate datetime column in R while keeping time accurate
4/12/2016 12:00:00 AM I have dates in the format above and have tried to use separate() to create two columns in the data frame where the data is present. When I do the columns are created but AM/PM so the times just become numbers or worse appear as "12H 0M 0S". Can anyone help me out, pretty new to data analysis as a whole and would be much appreciated!
-
How do I implement rank function for nearest values for a column in dataframe?
df.head(): run_time match_datetime country league home_team away_team 0 2021-08-07 00:04:36.326391 2021-08-06 Russia FNL 2 - Group 2 Yenisey 2 Lokomotiv-Kazanka 1 2021-08-07 00:04:36.326391 2021-08-07 Russia Youth League Ural U19 Krylya Sovetov Samara U19 2 2021-08-07 00:04:36.326391 2021-08-08 World Club Friendly Alaves Al Nasr 3 2021-08-07 00:04:36.326391 2021-08-09 China Jia League Chengdu Rongcheng Shenyang Urban FC 4 2021-08-06 00:04:36.326391 2021-08-06 China Super League Wuhan FC Tianjin Jinmen Tiger 5 2021-08-06 00:04:36.326391 2021-08-07 Czech Republic U19 League Sigma Olomouc U19 Karvina U19 6 2021-08-06 00:04:36.326391 2021-08-08 Russia Youth League Konoplev Academy U19 Rubin Kazan U19 7 2021-08-06 00:04:36.326391 2021-08-09 World Club Friendly Real Sociedad Eibar
desired df
run_time match_datetime country league home_team away_team 0 2021-08-07 00:04:36.326391 2021-08-06 Russia FNL 2 - Group 2 Yenisey 2 Lokomotiv-Kazanka 1 2021-08-07 00:04:36.326391 2021-08-07 Russia Youth League Ural U19 Krylya Sovetov Samara U19 4 2021-08-06 00:04:36.326391 2021-08-06 China Super League Wuhan FC Tianjin Jinmen Tiger 5 2021-08-06 00:04:36.326391 2021-08-07 Czech Republic U19 League Sigma Olomouc U19 Karvina U19
How do i use
rank
function to filter only the 2 nearestmatch_datetime
dates for everyrun_time
value. i.e. desired dataframe will be a filtered dataframe that will have all the nearest 2match_datetime
values for everyrun_time
-
Apache Spark Dataframe - Get length of each column
Question: In Apache Spark Dataframe, using
Python
, how can we get the data type and length of each column? I'm using latest version of python.Using
pandas
dataframe, I do it as follows:df = pd.read_csv(r'C:\TestFolder\myFile1.csv', low_memory=False) for col in df: print(col, '->', df[col].str.len().max())
-
Spark: retrieving old values of rows after casting made invalid input nulls
I am having trouble retrieving the old value before a cast of a column in spark. initially, all my inputs are strings and I want to cast the column num1 into a double type. However, when a cast is done to anything that is not a double, spark changes it to null.
Currently, I have dataframes
df1:
num1 unique_id 1 id1 a id2 2 id3 and a copy of df1: df1_copy where the cast is made.
when running
df1_copy = df1_copy.select(df1_copy.col('num1').cast('double'), df1_copy.col('unique_id'))
it returns df1_copy:
num1 unique_id 1 id1 null id2 2 id3 I have tried putting it into a different dataframe using select and when but get an error about not being able to find the column num1. The following is what I tried:
df2 = df1_copy.select(when(df1_copy.col("unique_id").equalTo(df1.col("unique_id")),df1.col('num1)).alias("invalid"), df1_copy.col('unique_id'))
-
spark-shell commands throwing error : “error: not found: value spark”
:14: error: not found: value spark import spark.implicits._ ^ :14: error: not found: value spark import spark.sql ^
here is my enviroment configuration. I different many times but I keep getting this error. Anyone knows the reason? I saw a similar question but the answers did not solve my problem.
JAVA_HOME : C:\Program Files\Java\jdk1.8.0_51
HADOOP_HOME : C:\Hadoop\winutils-master\hadoop-2.7.1
SPARK_HOME : C:\Hadoop\spark-2.2.0-bin-hadoop2.7
PATH :%JAVA_HOME%\bin;%SCALA_HOME%\bin;%HADOOP_HOME%\bin;%SPARK_HOME%\bin;
-
Computing number of business days between start/end columns
I have two Dataframes
- facts:
- columns:
data
,start_date
andend_date
- columns:
- holidays:
- column:
holiday_date
- column:
What I want is a way to produce another Dataframe that has columns:
data
,start_date
,end_date
andnum_holidays
Where
num_holidays
is computed as: Number of days between start and end that are not weekends or holidays (as in theholidays
table).The solution is here if we wanted to do this in PL/SQL. Crux is this part of code:
--Calculate and return the number of workdays using the input parameters. --This is the meat of the function. --This is really just one formula with a couple of parts that are listed on separate lines for documentation purposes. RETURN ( SELECT --Start with total number of days including weekends (DATEDIFF(dd,@StartDate, @EndDate)+1) --Subtact 2 days for each full weekend -(DATEDIFF(wk,@StartDate, @EndDate)*2) --If StartDate is a Sunday, Subtract 1 -(CASE WHEN DATENAME(dw, @StartDate) = 'Sunday' THEN 1 ELSE 0 END) --If EndDate is a Saturday, Subtract 1 -(CASE WHEN DATENAME(dw, @EndDate) = 'Saturday' THEN 1 ELSE 0 END) --Subtract all holidays -(Select Count(*) from [dbo].[tblHolidays] where [HolDate] between @StartDate and @EndDate ) ) END
I'm new to pyspark and was wondering what's the efficient way to do this? I can post the udf I'm writing if it helps though I'm going slow because I feel it's the wrong thing to do:
- Is there a better way than creating a UDF that reads the
holidays
table in a Dataframe and joins with it to count the holidays? Can I even join inside a udf? - Is there a way to write a
pandas_udf
instead? Would it be faster enough? - Are there some optimizations I can apply like cache the holidays table somehow on every worker?
- facts:
-
What is the right memory allocations that can be given to multiple spark streaming jobs if it is being processed in a single EMR cluster (m5.xlarge)?
I have 12 spark streaming jobs and it receives a small size data at any time. These scripts has spark transformations and joins.
What is the right memory allocations can be given to these spark streaming jobs if it is being processed in a single EMR cluster (m5.xlarge) (not using EMR steps) ? The memory allocations includes num-executors, executor-memory etc.
Please explain the working of these spark jobs in the cluster. How will the cluster split resource to these jobs? Please help me with the basics.
-
Error Converting Rdd in Dataframe Pyspark
I am trying to turn a rdd into a dataframe. The operation seems to be successful but when I then try to count the number of elements in the dataframe I get an error. This is my code:
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark.sql.functions import col sc = SparkContext(appName = 'ANALYSIS', master = 'local') rdd = sc.textFile('file.csv') rdd = rdd.filter(lambda line: line != header) rdd = rdd.map(lambda line: line.rsplit(',', 6)) spark = SparkSession.builder \ .master("local[*]") \ .appName("ANALYSIS") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() feature = ['to_drop','watched','watching','wantwatch','dropped','rating','votes'] df = spark.createDataFrame(rdd, schema = feature) rdd.collect() --> **it works** df.show() --> **it works** df.count() --> **does not work**
Can someone kindly report any errors to me? Thanks
The error I encounter during the execution is the following
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-15-3c9a60fd698f> in <module> ----> 1 df.count() /opt/conda/lib/python3.8/site-packages/pyspark/sql/dataframe.py in count(self) 662 2 663 """ --> 664 return int(self._jdf.count()) 665 666 def collect(self): /opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception) /opt/conda/lib/python3.8/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value)