PySpark Structured Streaming with Kafka Issue on groupBy

I am trying to find duplicate timestamp and flag it in Structured Streaming with Kafka. So, in order to find the count of each timestamp I am doing groupBy on each timestamp. As it turns out, without watermark aggregation is not supported in append mode. I tried using watermark on the date_time column of timestamp type. The code is running fine but it is not writing any result to Kafka.

spark.conf.set("spark.sql.shuffle.partitions", "1")

groupedDF = readStream \
    .withWatermark("date_time", "3 seconds") \
    .groupBy("date_time", 'id') \
    .agg((count("value") > 1).cast("int").alias("duplicate_flag"))

finalDF = readStream.join(groupedDF,

outputStream = finalDF \
    .select(concat(col("date_time"), lit(','), col("id"), lit(','), col("value"), lit(','), col("duplicate_flag"))

outputStream \
    .selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime='3 seconds') \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("value.serializer", "org.common.serialization.StringSerializer") \
    .option("key.serializer", "org.common.serialization.StringSerializer") \
    .option("fetchOffset.numRetries", 3) \
    .option("topic", "topic_test") \
    .option("checkpointLocation", "/checkpoint") \
    .start() \

I also tried using window function as below in groupedDF, it is giving output randomly. Sometime output is missing and sometime it gives output of old batch. Sometime, there's latency of more than 15sec even if I am using watermark, window duration and trigger all 3 seconds.

groupedDF = readStream \
    .withWatermark("date_time", "3 seconds") \
    .groupBy("date_time", 'id') \
    .agg((count("value") > 1).cast("int").alias("duplicate_flag"))

How can I solve this problem?