Which threading mechanism to use for tasks that enqueue other tasks?

I'm using a task that creates other tasks. Those tasks in turn may or may not create subsequent tasks. I don't know beforehand how many tasks will be created in total. At some point, no more tasks will be created, and all the task will finish.

When the last task is done, I must do some extra stuff.

Which threading mechanism should be used? I've read about CountDownLatch, Cyclic Barrier and Phaser but none seem to fit.

I've also tried using ExecutorService, but I've encountered some issues such as the inability to execute something at the end, and you can see my attempt below:

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Issue {

  public static void main(String[] args) throws InterruptedException {
    var count = new AtomicInteger(1);
    var executor = Executors.newFixedThreadPool(3);
    class Task implements Runnable {
      final int id = count.getAndIncrement();
      @Override
      public void run() {
        try {
          MILLISECONDS.sleep((long)(Math.random() * 1000L + 1000L));
        } catch (InterruptedException e) {
          // Do nothing
        }
        if (id < 5) {
          executor.submit(new Task());
          executor.submit(new Task());
        }
        System.out.println(id);
      }
    }
    executor.execute(new Task());
    executor.shutdown();
//    executor.awaitTermination(20, TimeUnit.SECONDS);
    System.out.println("Hello");
  }
}

This outputs an exception because tasks are added after shutdown() is called, but the expected output would be akin to:

1
2
3
4
5
6
7
8
9
Hello

Which threading mechanism can help me do that?

2 answers

  • answered 2021-06-10 11:14 Michael

    It seems pretty tricky. If there is even a single task that's either in the queue or currently executing, then since you can't say whether or not it will spawn another task, you have no way to know how long it may run for. It may be the start of a chain of tasks that takes another 2 hours.

    I think all the information you'd need to achieve this is encapsulated by the executor implementations. You need to know what's running and what's in the queue.

    I think you're unfortunately looking at having to write your own executor. It needn't be complicated and it doesn't have to conform to the JDK's interfaces if you don't want it to. Just something that maintains a thread pool and a queue of tasks. Add the ability to attach listeners to the executor. When the queue is empty and there are no actively executing tasks then you can notify the listeners.

    Here's a quick code sketch.

    class MyExecutor
    {
        private final AtomicLong taskId = new AtomicLong();
        private final Map<Long, Runnable> idToQueuedTask = new ConcurrentHashMap<>();
        private final AtomicLong runningTasks  = new AtomicLong();
        private final ExecutorService delegate = Executors.newFixedThreadPool(3);
    
        public void submit(Runnable task) {
            long id = taskId.incrementAndGet();
            final Runnable wrapped = () -> {
                taskStarted(id);
                try {
                    task.run();
                }
                finally {
                    taskEnded();
                }
            };
            idToQueuedTask.put(id, wrapped);
            delegate.submit(wrapped);
        }
        
        private void taskStarted(long id) {
            idToQueuedTask.remove(id);
            runningTasks.incrementAndGet();
        }
        
        private void taskEnded() {
            final long numRunning = runningTasks.decrementAndGet();
            if (numRunning == 0 && idToQueuedTask.isEmpty()) {
                System.out.println("Done, time to notify listeners");
            }
        }
        
        public static void main(String[] args) {
            MyExecutor executor = new MyExecutor();
            executor.submit(() -> {
                System.out.println("Parent task");
                
                try {
                    Thread.sleep(1000);
                }
                catch (Exception e) {}
                
                executor.submit(() -> {
                    System.out.println("Child task");
                });
            });
        }
    }
    

  • answered 2021-06-10 12:50 Sascha

    If you change your ExecutorService to this:

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
    

    You could then use the count functions to wait:

        while(executor.getTaskCount() > executor.getCompletedTaskCount())
        {
            TimeUnit.SECONDS.sleep(10L);
        }
        executor.shutdown();
        System.out.println("Hello");