Flink Streaming RichSource early stops

It runs with processing time and using a broadcast state.


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


    BroadcastStream<List<TableOperations>> broadcastOperationsState = env
                .addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);

        SingleOutputStreamOperator<InternalVariableValue> stream = 
                env.addSource(new SourceMillisInternalVariableValue(5000L));

        
        SingleOutputStreamOperator<InternalVariableOperation> streamProcessed = 
                stream
                .keyBy(InternalVariableValue::getUuid)
                .connect(broadcastOperationsState)
                .process(new AddOperationInfo())
                ;

        

streamProcessed.print();


SourceMillisIntervalVariableValues create a event every 5s . The events are stored in a static collection. The run method looks like :


    public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
    
    private boolean running;
    
    long millis;

    public SourceMillisInternalVariableValue(long millis) {
        super();
        this.millis = millis;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        running = true;
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void run(SourceContext<InternalVariableValue> ctx) throws Exception {        
        //Espera inicial
        Thread.sleep(1500);
        PojoVariableValues[] pojoData =
                 new PojoVariableValues[]{
                  new PojoVariableValues("id1", "1"),
                  new PojoVariableValues("id2", "2"),
                 ....
                 ....
                  new PojoVariableValues("id21", "21")
                         };
        
         int cont = 0;
         while (cont<pojoData.length) {
            System.out.println("Iteration "+cont+" "+pojoData.length);
            ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));      
            ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));      
            ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));      
            cont = cont +3;
            Thread.sleep(millis);       
        }
        
    }
    
    private InternalVariableValue generateVar(String uuid, String value)
    {
        return InternalVariableValueMessage.InternalVariableValue.newBuilder()
                .setUuid(uuid)
                .setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
    }
    
    class PojoVariableValues {
        private String UUID;
        private String Value;
        
        public PojoVariableValues(String uUID, String value) {
            super();
            UUID = uUID;
            Value = value;
        }
        
        public String getUUID() {
            return UUID;
        }
        public void setUUID(String uUID) {
            UUID = uUID;
        }
        public String getValue() {
            return Value;
        }
        public void setValue(String value) {
            Value = value;
        }
        
       }
}

LoadCassandraOperations emits events every 10 seconds. It works fine.

When I run this code, SourceMillisIntervalVariableValues stops in the first iteration, emiting only three events. If I comment the process function, both sources works properly, but if I run the process , the source is cancel...

I spect than the source emits all events ( 21 exactly ) , and all of them are processing in the aggregate function. If I run this code, the while loop in the sources only complete one iteration.

Any idea ?

Thank youuu . cheers

EDIT:

Important. This code is for explore the processint time and broadcast feature. I know that I'm not using the best practices in the sources. Thanks

EDIT 2: The problem starts when I try to run the process function.