For Loop keeps restarting in EMR (pyspark)

I have a nested for loop that performs operations on a data frame 10 times in the inner loop and joins the resulting 10 data frames into a single data frame once it finishes the inner loop.

UPDATE: I use a dictionary to create a list of dataframes to store each operation in and then union them at the end of the inner loop.

It then writes it to a parquet file with the iteration number of the outloop. The outerloop has 6 iterations and therefore should result in 6 parquet files.

It goes something like this:

train=0
for i in range(0,6):
    train=train+30
    #For loop to aggregate input and create 10 output dataframes
    dfnames={}
    for j in range(0,10):
        ident="_"+str(j)  
        #Load dataframe of around 1M rows
        df=spark.read.parquet("s3://path")
        dfnames['df'+ident]= #Perform aggregations and operations
    #Combine the 10 datframes into a single df
    df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
    #Write to output parquet file
    df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"

It seems to be working fine until it finishes the 3rd iteration of the outer loop. Then for some reason, it restarts the loop with another attempt id. So I get the first 3 files, but instead of going to the 4th iteration, it restarts to give the first file all over again. I dont get any failed stages or jobs.

I have tried running the for loops alone with dummy variables and print statements (without loading the large data frames etc) and they work fine to completion. I am thinking it has something to do with the way the memory is being flushed after a loop.

These are my EMR Spark running conditions: I am running this on an EMR cluster with 5 executors, 5 driver nodes, and 10 instances with a total of 50 cores. The spark executor and driver memory is 45G each with a total of about 583G. The typical shuffle read is 250G and shuffle write is 331G.

Some of the pertinent Spark environment variables are shown below:

enter image description here

Is there something I am doing wrong with regards to the loop or memory management? Any insight would be greatly appreciated!

1 answer

  • answered 2020-11-25 03:14 srikanth holur

    How are you getting your df1, df2... before this line?

    #Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10)

    My guess is, your dataframe plan is growing big and that might be causing issues.

    I would suggest creating a list of dataframes in the inner loop and use reduce method to union them.

    Something like below

    from functools import reduce
    from pyspark.sql import DataFrame
    df_list = []
    for j in range(0,10):  
            #Load dataframe of around 1M rows
            df = spark.read.parquet("s3://path")
            transformed_df = #do your transforms
            df_list.append(transformed_df)
    
    final_df = reduce(DataFrame.unionByName, df_list)