How to use a external trigger to stop streaming query?

I am using spark structured streaming and I want to check if a stop file exists to exit my program.

I can do something like:

val query = SparkSession...load.writeStream.foreachBatch{
  if (stop file exist) exit(0)
  // do some processing here
}.start()
query.awaitTermination()

However, this could only be triggered if there are new rows appended to this query table. If there are no new rows, the stop file would have no impact whatsoever.

Any better idea to implement this trigger?

1 answer

  • answered 2019-12-15 11:16 Jacek Laskowski

    Any better idea to implement this trigger?

    A streaming query is a separate daemon thread of a Structured Streaming application. It runs forever until it is stopped using StreamingQuery.stop.

    There are at least two ways to access a running streaming query:

    1. DataStreamWriter.start()
    2. StreamingQueryManager

    The idea is to have a "control thread" in your Structured Streaming application that would listen to stop requests (with the ID of the streaming query or queries) and simply execute stop on the running streaming queries.


    Think of a Spark Structured Streaming application as a single-JVM application with multiple threads. You can have one more to control the threads. That's the basic idea.