Can you construct pyspark.pandas.DataFrame from pyspark.sql.dataframe.DataFrame?
I am new to Spark / Databricks. My question is whether is it recommended / possible to mix sql and Pandas API dataframes? Is it possible to create a pyspark.pandas.DataFrame directly from a pyspark.sql.dataframe.DataFrame, or I need to re-read the parquet file?
# Suppose you have an SQL dataframe (now I read Boston Safety Data from Microsoft Open Dataset)
blob_account_name = "azureopendatastorage"
blob_container_name = "citydatacontainer"
blob_relative_path = "Safety/Release/city=Boston"
blob_sas_token = r""
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasbs_path)
df = spark.read.parquet(wasbs_path)
# Convert df to pyspark.pandas.Dataframe
df2 = # ...?
Tried df.toPandas()
, that is not good, because it converts to plain, undistributed pandas.core.frame.DataFrame
.
A workaround is to read the parquet again into a pyspark.pandas.Dataframe
which I try to avoid.
Thanks!
1 answer
-
answered 2022-05-04 11:23
ScootCork
IIUC you are looking to convert a spark dataframe to a pandas on spark dataframe.
You can do so with
to_pandas_on_spark
method.df2 = df.to_pandas_on_spark() print(type(df2)) <class 'pyspark.pandas.frame.DataFrame'>
do you know?
how many words do you know
See also questions close to this topic
-
How to bring data frame into single column from multiple columns in python
I have data format in these multiple columns. So I want to bring all 4 columns of data into a single column.
YEAR Month pcp1 pcp2 pcp3 pcp4 1984 1 0 0 0 0 1984 2 1.2 0 0 0 1984 3 0 0 0 0 1984 4 0 0 0 0 1984 5 0 0 0 0 1984 6 0 0 0 1.6 1984 7 3 3 9.2 3.2 1984 8 6.2 27.1 5.4 0 1984 9 0 0 0 0 1984 10 0 0 0 0 1984 11 0 0 0 0 1984 12 0 0 0 0
-
separate datetime column in R while keeping time accurate
4/12/2016 12:00:00 AM I have dates in the format above and have tried to use separate() to create two columns in the data frame where the data is present. When I do the columns are created but AM/PM so the times just become numbers or worse appear as "12H 0M 0S". Can anyone help me out, pretty new to data analysis as a whole and would be much appreciated!
-
How do I implement rank function for nearest values for a column in dataframe?
df.head(): run_time match_datetime country league home_team away_team 0 2021-08-07 00:04:36.326391 2021-08-06 Russia FNL 2 - Group 2 Yenisey 2 Lokomotiv-Kazanka 1 2021-08-07 00:04:36.326391 2021-08-07 Russia Youth League Ural U19 Krylya Sovetov Samara U19 2 2021-08-07 00:04:36.326391 2021-08-08 World Club Friendly Alaves Al Nasr 3 2021-08-07 00:04:36.326391 2021-08-09 China Jia League Chengdu Rongcheng Shenyang Urban FC 4 2021-08-06 00:04:36.326391 2021-08-06 China Super League Wuhan FC Tianjin Jinmen Tiger 5 2021-08-06 00:04:36.326391 2021-08-07 Czech Republic U19 League Sigma Olomouc U19 Karvina U19 6 2021-08-06 00:04:36.326391 2021-08-08 Russia Youth League Konoplev Academy U19 Rubin Kazan U19 7 2021-08-06 00:04:36.326391 2021-08-09 World Club Friendly Real Sociedad Eibar
desired df
run_time match_datetime country league home_team away_team 0 2021-08-07 00:04:36.326391 2021-08-06 Russia FNL 2 - Group 2 Yenisey 2 Lokomotiv-Kazanka 1 2021-08-07 00:04:36.326391 2021-08-07 Russia Youth League Ural U19 Krylya Sovetov Samara U19 4 2021-08-06 00:04:36.326391 2021-08-06 China Super League Wuhan FC Tianjin Jinmen Tiger 5 2021-08-06 00:04:36.326391 2021-08-07 Czech Republic U19 League Sigma Olomouc U19 Karvina U19
How do i use
rank
function to filter only the 2 nearestmatch_datetime
dates for everyrun_time
value. i.e. desired dataframe will be a filtered dataframe that will have all the nearest 2match_datetime
values for everyrun_time
-
Apache Spark Dataframe - Get length of each column
Question: In Apache Spark Dataframe, using
Python
, how can we get the data type and length of each column? I'm using latest version of python.Using
pandas
dataframe, I do it as follows:df = pd.read_csv(r'C:\TestFolder\myFile1.csv', low_memory=False) for col in df: print(col, '->', df[col].str.len().max())
-
Spark: retrieving old values of rows after casting made invalid input nulls
I am having trouble retrieving the old value before a cast of a column in spark. initially, all my inputs are strings and I want to cast the column num1 into a double type. However, when a cast is done to anything that is not a double, spark changes it to null.
Currently, I have dataframes
df1:
num1 unique_id 1 id1 a id2 2 id3 and a copy of df1: df1_copy where the cast is made.
when running
df1_copy = df1_copy.select(df1_copy.col('num1').cast('double'), df1_copy.col('unique_id'))
it returns df1_copy:
num1 unique_id 1 id1 null id2 2 id3 I have tried putting it into a different dataframe using select and when but get an error about not being able to find the column num1. The following is what I tried:
df2 = df1_copy.select(when(df1_copy.col("unique_id").equalTo(df1.col("unique_id")),df1.col('num1)).alias("invalid"), df1_copy.col('unique_id'))
-
spark-shell commands throwing error : “error: not found: value spark”
:14: error: not found: value spark import spark.implicits._ ^ :14: error: not found: value spark import spark.sql ^
here is my enviroment configuration. I different many times but I keep getting this error. Anyone knows the reason? I saw a similar question but the answers did not solve my problem.
JAVA_HOME : C:\Program Files\Java\jdk1.8.0_51
HADOOP_HOME : C:\Hadoop\winutils-master\hadoop-2.7.1
SPARK_HOME : C:\Hadoop\spark-2.2.0-bin-hadoop2.7
PATH :%JAVA_HOME%\bin;%SCALA_HOME%\bin;%HADOOP_HOME%\bin;%SPARK_HOME%\bin;
-
Computing number of business days between start/end columns
I have two Dataframes
- facts:
- columns:
data
,start_date
andend_date
- columns:
- holidays:
- column:
holiday_date
- column:
What I want is a way to produce another Dataframe that has columns:
data
,start_date
,end_date
andnum_holidays
Where
num_holidays
is computed as: Number of days between start and end that are not weekends or holidays (as in theholidays
table).The solution is here if we wanted to do this in PL/SQL. Crux is this part of code:
--Calculate and return the number of workdays using the input parameters. --This is the meat of the function. --This is really just one formula with a couple of parts that are listed on separate lines for documentation purposes. RETURN ( SELECT --Start with total number of days including weekends (DATEDIFF(dd,@StartDate, @EndDate)+1) --Subtact 2 days for each full weekend -(DATEDIFF(wk,@StartDate, @EndDate)*2) --If StartDate is a Sunday, Subtract 1 -(CASE WHEN DATENAME(dw, @StartDate) = 'Sunday' THEN 1 ELSE 0 END) --If EndDate is a Saturday, Subtract 1 -(CASE WHEN DATENAME(dw, @EndDate) = 'Saturday' THEN 1 ELSE 0 END) --Subtract all holidays -(Select Count(*) from [dbo].[tblHolidays] where [HolDate] between @StartDate and @EndDate ) ) END
I'm new to pyspark and was wondering what's the efficient way to do this? I can post the udf I'm writing if it helps though I'm going slow because I feel it's the wrong thing to do:
- Is there a better way than creating a UDF that reads the
holidays
table in a Dataframe and joins with it to count the holidays? Can I even join inside a udf? - Is there a way to write a
pandas_udf
instead? Would it be faster enough? - Are there some optimizations I can apply like cache the holidays table somehow on every worker?
- facts:
-
What is the right memory allocations that can be given to multiple spark streaming jobs if it is being processed in a single EMR cluster (m5.xlarge)?
I have 12 spark streaming jobs and it receives a small size data at any time. These scripts has spark transformations and joins.
What is the right memory allocations can be given to these spark streaming jobs if it is being processed in a single EMR cluster (m5.xlarge) (not using EMR steps) ? The memory allocations includes num-executors, executor-memory etc.
Please explain the working of these spark jobs in the cluster. How will the cluster split resource to these jobs? Please help me with the basics.
-
Databriks spark sql statement failing
I'm writing this dynamic case statement where me_col_name(a column) is me_id or mode (names of columns) then execute one select statment, when me_col_name is another column suppose mep_id then it should run select statement with some join statement
can any one tell where I'm wrong in this statement, or help with any answers;
this is statement :
spark.sql(" INSERT INTO etl.me_change_log ( SELECT DISTINCT "+ me_col_name + ","+"updated_datetime" + ","+ "'" + table_name + "'" + " from "+ table_name+"_INCR CASE WHEN " +me_col_name+ "= ME_ID )THEN select distinct "+ me_col_name + "," + "created_datetime,"+ "'" + table_name + "'" +" from "+ table_name+"_INCR WHEN " + me_col_name + "= MODE_EXECUTION_ID THEN SELECT 'distinct "+ me_col_name + "," + "created_datetime,"+ "'" + table_name + "'" +" from "+ table_name+"_INCR WHEN " + me_col_name +"= MODE_EXECUTION_POINT_ID THEN SELECT 'DISTINCT mep.mode_execution_id, mep.updated_datetime,"+ "'" + table_name + "'" + " from dne4.mode_execution_point as mep inner join dne4.me_appointment_history as mah on mep.id = mah.MODE_EXECUTION_POINT_ID ELSE NULL END as" + table_name+ " from me_change_log) ")
-
Databricks delta live tables stuck when ingest file from S3
I'm new to databricks and just created a delta live tables to ingest 60 millions json file from S3. However the input rate (the number of files that it read from S3) is stuck at around 8 records/s, which is very low IMO. I have increased the number of worker/core in my delta live tables but the input rate stays the same.
Is there any config that I have to add to increase the input rate for my pipeline?
-
Unable to fetch secrets using Instance Profile from databricks for a spring boot application
I am using spring-cloud-starter-aws-secrets-manager-config 2.3.3 for a spring boot application which works perfectly in my local pointing to stage environment where i configure AWS_PROFILE to fetch secrets.
spring.config.import=aws-secretsmanager:service/xyz/stage-v2/email/smtp;
service/xyz/stage-v2/ex/db/token<dependency> <groupId>io.awspring.cloud</groupId> <artifactId>spring-cloud-starter-aws-secrets-manager-config</artifactId> <version>2.3.3</version> </dependency>
In stage environment i am setting up instance profile and assuming role as : "spark.hadoop.fs.s3a.stsAssumeRole.arn=arn:aws:iam::XXXXXXXXX:role/databricksYYY"
with the same role setup other applications run but they are not spring based.
but application fails to fetch secret values with below error :
Config data resource '[AwsSecretsManagerConfigDataResource@71560f51 context = 'service/xyz/stage-v2/ex/db/token', optional = false]' via location 'aws-secretsmanager:service/xyz/stage-v2/email/smtp;service/xyz/stage-v2/ex/db/token' does not exist
Please let me know what should be done so that spring recognizes the instance profile
-
Azure Synapse Notebooks Vs Azure Databricks notebooks
I was going through the features of Azure Synapse Notebooks Vs Azure Databricks notebooks.
- Are there any major differences between these apart from the component they belong to ?
- Are there any scenarios where one is more appropriate over other?
-
Load Data Using Azure Batch Service and Spark Databricks
I have File Azure Blob Storage that I need to load daily into the Data Lake. I am not clear on which approach I should use(Azure Batch Account, Custom Activity or Databricks, Copy Activity ). Please advise me.
-
How do I Insert Overwrite with parquet format?
I am have two parquet file in azure data lake gen2 I want to Insert Overwrite onw with other. I was trying the same in azure data bricks by doing below.
Reading from data lake to azure.
val cs_mbr_prov_1=spark.read.format("parquet").option("header","true").load(s"${SourcePath}/cs_mbr_prov") cs_mbr_prov_1.createOrReplaceTempView("cs_mbr_prov") val cs_mbr_prov_stg=spark.read.format("parquet").option("header","true").load(s"${SourcePath}/cs_mbr_prov_stg_1") cs_mbr_prov_stg.createOrReplaceTempView("cs_mbr_prov_stg")
var res =spark.sql(s"INSERT OVERWRITE TABLE cs_br_prov " + s"SELECT NAMED_STRUCT('IND_ID',stg.IND_ID,'CUST_NBR',stg.CUST_NBR,'SRC_ID',stg.SRC_ID, "+ s"'SRC_SYS_CD',stg.SRC_SYS_CD,'OUTBOUND_ID',stg.OUTBOUND_ID,'OPP_ID',stg.OPP_ID, " + s"'CAMPAIGN_CD',stg.CAMPAIGN_CD,'TREAT_KEY',stg.TREAT_KEY,'PROV_KEY',stg.PROV_KEY, " + s"'INSERTDATE',stg.INSERTDATE,'UPDATEDATE',stg.UPDATEDATE,'CONTACT_KEY',stg.CONTACT_KEY) AS key, "+ s"stg.MEM_KEY,stg.INDV_ID,stg.MBR_ID,stg.OPP_DT,stg.SEG_ID,stg.MODA,stg.E_KEY, " + s"stg.TREAT_RUNDATETIME from cs_br_prov_stg stg")
Error I am getting:
AnalysisException: unknown requires that the data to be inserted have the same number of columns as the target table: target table has 20 column(s) but the inserted data has 9 column(s), including 0 partition column(s) having constant value(s).
But having same no.of colums in both.