This Akka stream sometimes doesn't finish

I have a graph that reads lines from multiple gzipped files and writes those lines to another set of gzipped files, mapped according to some value in each line.

It works correctly against small data sets, but fails to terminate on larger data. (It may not be the size of the data that's to blame, as I have not run it enough times to be sure - it takes a while).

def files: Source[File, NotUsed] =
  Source.fromIterator(
    () =>
      Files
        .fileTraverser()
        .breadthFirst(inDir)
        .asScala
        .filter(_.getName.endsWith(".gz"))
        .toIterator)

def extract =
  Flow[File]
    .mapConcat[String](unzip)
    .mapConcat(s =>
      (JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
    .groupBy(1 << 16, _._1)
    .groupedWithin(1000, 1.second)
    .map { lines =>
      val w = writer(lines.head._1)
      w.println(lines.map(_._2).mkString("\n"))
      w.close()
      Done
    }
    .mergeSubstreams

def unzip(f: File) = {
  scala.io.Source
    .fromInputStream(new GZIPInputStream(new FileInputStream(f)))
    .getLines
    .toIterable
    .to[collection.immutable.Iterable]
}

def writer(tk: String): PrintWriter =
  new PrintWriter(
    new OutputStreamWriter(
      new GZIPOutputStream(
        new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
      ))
  )

val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()

Await.result(process, Duration.Inf)

The thread dump shows that the process is WAITING at Await.result(process, Duration.Inf) and nothing else is happening.

OpenJDK v11 with Akka v2.5.15