Flink Streaming RichSource early stops

It runs with processing time and using a broadcast state.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    BroadcastStream<List<TableOperations>> broadcastOperationsState = env
                .addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);

        SingleOutputStreamOperator<InternalVariableValue> stream = 
                env.addSource(new SourceMillisInternalVariableValue(5000L));

        SingleOutputStreamOperator<InternalVariableOperation> streamProcessed = 
                .process(new AddOperationInfo())



SourceMillisIntervalVariableValues create a event every 5s . The events are stored in a static collection. The run method looks like :

    public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
    private boolean running;
    long millis;

    public SourceMillisInternalVariableValue(long millis) {
        this.millis = millis;

    public void open(Configuration parameters) throws Exception {
        running = true;

    public void cancel() {
        running = false;

    public void run(SourceContext<InternalVariableValue> ctx) throws Exception {        
        //Espera inicial
        PojoVariableValues[] pojoData =
                 new PojoVariableValues[]{
                  new PojoVariableValues("id1", "1"),
                  new PojoVariableValues("id2", "2"),
                  new PojoVariableValues("id21", "21")
         int cont = 0;
         while (cont<pojoData.length) {
            System.out.println("Iteration "+cont+" "+pojoData.length);
            ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));      
            ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));      
            ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));      
            cont = cont +3;
    private InternalVariableValue generateVar(String uuid, String value)
        return InternalVariableValueMessage.InternalVariableValue.newBuilder()
                .setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
    class PojoVariableValues {
        private String UUID;
        private String Value;
        public PojoVariableValues(String uUID, String value) {
            UUID = uUID;
            Value = value;
        public String getUUID() {
            return UUID;
        public void setUUID(String uUID) {
            UUID = uUID;
        public String getValue() {
            return Value;
        public void setValue(String value) {
            Value = value;

LoadCassandraOperations emits events every 10 seconds. It works fine.

When I run this code, SourceMillisIntervalVariableValues stops in the first iteration, emiting only three events. If I comment the process function, both sources works properly, but if I run the process , the source is cancel...

I spect than the source emits all events ( 21 exactly ) , and all of them are processing in the aggregate function. If I run this code, the while loop in the sources only complete one iteration.

Any idea ?

Thank youuu . cheers


Important. This code is for explore the processint time and broadcast feature. I know that I'm not using the best practices in the sources. Thanks

EDIT 2: The problem starts when I try to run the process function.