How to filter rows which have been changed? Spark Scala

My Raw data set:

type             key         tg      created_at       timestamp       row_number

device_id    essentials    template   1600269347   2020-09-21 19:08:05      1                           
device_id    experiment      t1       1599721314   2020-09-17 01:37:17      1                                                    
device_id    experiment      v1       1600228007   2020-09-21 18:07:53      2
device_id    experiment      c1       1605221085   2020-09-21 18:07:53      3
test         t_key           t1       1599714939   2020-09-16 01:37:55      1
test         t_key           t2       1600084857   2020-09-21 17:08:23      2

Applied steps till now-:

    val schema =  List(
      StructField("type", StringType, true),
      StructField("key", StringType, true),
      StructField("tg", StringType, true),
      StructField("created_at", IntegerType, true),
      StructField("timestamp", TimestampType, true),
      StructField("row_number", IntegerType, true)
    )
    
    val data =  Seq(
        Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
        Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),  
        Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
        Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
        Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
        Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
      )
    val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
    
    val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
    test
.withColumn("startDate", first(col("created_at")).over(windowSpec))
.withColumn("endDate", when(
  lead(col("tg"), 1).over(windowSpec).isNotNull && 
  lead(col("tg"), 1).over(windowSpec) =!= col("tg"), 
  lead(col("created_at"), 1).over(windowSpec)
).otherwise(lit(null).cast(IntegerType)))
.show()

Expected Output-:

type        key         tg      created_at       timestamp     row_number startDate  endDate

device_id  essentials template 1600269347   2020-09-21 19:08:05  1        1600269347  null                
device_id  experiment   t1     1599721314   2020-09-17 01:37:17  1        1599721314  1600228007                                      
device_id  experiment   v1     1600228007   2020-09-21 18:07:53  2        1599721314  1605221085
device_id  experiment   c1     1605221085   2020-09-21 18:07:53  3        1599721314  null
test       t_key        t1     1599714939   2020-09-16 01:37:55  1        1599714939  1600084857
test       t_key        t2     1600084857   2020-09-21 17:08:23  2        1599714939  null

Now after applying tranformations i write the data to s3(parquet format).

How to handle the next iteration when new set of data comes like this:

val schema2 =  List(
  StructField("type", StringType, true),
  StructField("key", StringType, true),
  StructField("tg", StringType, true),
  StructField("created_at", IntegerType, true),
  StructField("timestamp", TimestampType, true))

val data2 =  Seq(
    Row("device_id", "essentials", "template", 1526472602, Timestamp.valueOf("2020-10-15 19:08:05")),
    Row("device_id", "experiment", "c1", 1727784602, Timestamp.valueOf("2020-10-10 01:37:17")),
    Row("device_id", "experiment", "x1", 1728994202, Timestamp.valueOf("2020-10-19 01:37:17")) 
  )

Now i am reading data from s3 (which i wrote after applying transformations) And union that dataframe with the new data coming-> apply the logic above, then delete whole data at s3 and write the new updated dataframe. But this approach doesn't seem to be efficient, any other approach which can be used for similar iterations happening over and over? How to filter only those rows which have been changed and write them to s3?