Error while calling a class which implements Callback interface, for Spark based Kafka Producer

I am running a spark based Kafka producer, wherein I am trying to print success messages while calling a class, that implements CallBack interface, in Kafka Producer. Code for the same is:

import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.sql.SparkSession

    object KafkaProducer extends Serializable{
    
      def main (args: Array[String]): Unit = {
        val spark = new SparkSession.Builder().enableHiveSupport().getOrCreate()
        val topic = "ktopic"
        val props = new Properties()
    
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"ID_1")
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"server")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
        spark.sql("query").na.fill("").repartition(2).foreachPartition(rows => {
          val prod = new KafkaProducer[String, String](props)
          rows.foreach(row => {
            val key = row.getAs(0).toString
            val value = row.getAs(1).toString
            println(s" key: $key")
            val callback = new ProducerCallBack(key)
            val message = new ProducerRecord(topic, key, value)
            val metadata = prod.send(message,callback).get()
            println(s"offset ${metadata.offset()}")    
          }
          )
          prod.flush()
          prod.close()
        }
        )
      }
    }
    
    class ProducerCallBack(key:String) extends Callback with Serializable {
      override def onCompletion(metadata: RecordMetadata, e: Exception): Unit ={
        if(e != null) {
          println(s"Got error message ${e.getMessage} for $key")
        }else{
          println(s"Success for $key, against ${metadata.offset()}")  
        }
      }
    }

The code does ship the message to Kafka topic and returns metadata. However, it doesn't execute code for producerCallBack class and throws below error, which has been taken from yarn logs:

11:59:54 ERROR ProducerBatch: Error executing user-provided callback on message for topic-partition 'ktopic-0'
java.lang.NullPointerException
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1422)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:591)
        at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:567)
        at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
        at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
        at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:748)

If anybody has any idea about this error, and thus could help with resolving it?