Kinesis spring binder after upgrade to 2.1.0 number of writes to SpringIntegrationLockRegistry tripled, why?
Recently updated the spring kinesis binder from 2.0.1.RELEASE to 2.1.0 and started to see the DynamoDB table number of writes against the table SpringIntegrationLockRegistry tripled. Wondering if anyone knows what has been changed in this lib that is doing it now.
Thanks.
1 answer
-
answered 2022-05-04 14:31
Artem Bilan
I think this commit did a respective change: https://github.com/spring-projects/spring-integration-aws/commit/ac74dfd2368c5c4b74793c259312313ad21ed5f8.
So, we renew a lock every time when we are ready to consume. If it is not locked at runtime, we are not lock holders therefore don't consume.
do you know?
how many words do you know
See also questions close to this topic
-
Spring cloud streams kafka streams state store
I want to get state store with custom key and value.
I have a kafka topic example-kafka-topic-event.
This is how I get ktable at the code level:
@Component("example") public class DeputyEventListener implements BiConsumer<KStream<String, String>, KTable<String, String>> { @Override public void accept(KStream<String, String> deputyEventStream, KTable<String, String> deputyEntity) { deputyEntity.toStream() .peek((key, value) -> System.out.println("IN TABLE VALUES IS + " + value + " ON KEY " + key)); } }
Here with such a property:
spring.cloud.stream.bindings.example-in-1: destination: example-kafka-topic-event consumer: materializedAs: example-state-store
I don't want to use Key Value just like in the kafka topic. I want to convert and use another field in ktable as a key. How can I do this? Is there any option to do such a conversion without losing the convenience of Spring cloud streams?
-
After Migrating from spring boot 2.6.2 to 2.6.6 , getting org.springframework.context.ApplicationContextException
Error logs:
org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.NoSuchMethodError: java.util.List.of(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/List; at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.18.jar:5.3.18] at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_202] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.18.jar:5.3.18] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.6.jar:2.6.6] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) [spring-boot-2.6.6.jar:2.6.6] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) [spring-boot-2.6.6.jar:2.6.6] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-2.6.6.jar:2.6.6] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) [spring-boot-2.6.6.jar:2.6.6] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) [spring-boot-2.6.6.jar:2.6.6] at com.walmart.ebs.people.system.payroll.data.sync.SpringBootApplicationStarter.main(SpringBootApplicationStarter.java:19) [classes/:na] Caused by: java.lang.NoSuchMethodError: java.util.List.of(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/List; at org.springframework.integration.support.json.JacksonJsonUtils.(JacksonJsonUtils.java:58) ~[spring-integration-core-5.5.10.jar:5.5.10] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.(KafkaMessageDrivenChannelAdapter.java:139) ~[spring-integration-kafka-5.5.10.jar:5.5.10] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:735) ~[spring-cloud-stream-binder-kafka-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:163) ~[spring-cloud-stream-binder-kafka-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:426) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:184) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_202] at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.2.3.jar:3.2.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.18.jar:5.3.18] ... 14 common frames omitted
Process finished with exit code 1
-
How to write unit test for spring cloud stream function based method?
When I try to test a spring cloud stream function based method, it always happens NullPointerException about InputDestination.
I have two questions:
- It's hard for me to know how to write UT from the official doc. official test doc
- Besides, how to write integration Test if test file has some dependencies. It seems create a new context and always has NoSuchBeanDefination error.
I have tried as flow, but the context can not find some dependency beans.
@Test public void sampleTest() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration( MyTestConfiguration.class)) .run("--spring.cloud.function.definition=uppercase")) { InputDestination source = context.getBean(InputDestination.class); OutputDestination target = context.getBean(OutputDestination.class); source.send(new GenericMessage<byte[]>("hello".getBytes())); assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes()); } }
So I just want to write UT, but still have NPE.
Here is my code.
@Bean public Function<Message<List<DemoBean>>, Message<DemoBean>> findFirstBean( ){ return message -> { List<DemoBean> demoBeans = message.getPayload(); return MessageBuilder.withPayload(demoBeans.get( 0 )).build(); }; }
Here is my test.
@SpringBootTest @ActiveProfiles(profiles = "local") @Import({ TestChannelBinderConfiguration.class}) class FunctionDemoTest { @Autowired private InputDestination inputDestination; @Autowired private OutputDestination outputDestination; private FunctionDemo functionDemo; // some dependency need to mock private DemoService demoService; @BeforeEach void setUp() { demoService = Mockito.mock( DemoService.class ); functionDemo = new FunctionDemo( demoService); } @Test public void findFirstBeanTest() { DemoBean demoBean = new DemoBean(); demoBean.setName("Howard"); demoBean.setAge( 1 ); DemoBean demoBean1 = new DemoBean(); demoBean1.setName("Frank"); demoBean1.setAge( 2 ); List<DemoBean> demoBeanList = new ArrayList<>(); demoBeanList.add( demoBean ); demoBeanList.add( demoBean1 ); Message<List<DemoBean>> inputMessage = MessageBuilder.withPayload(demoBeanList).build(); inputDestination.send(inputMessage,"findFirstBean-in-0"); Assertions.assertNotNull( outputDestination.receive( 10000, "findFirstBean-out-0") ); } }
Here is error:
java.lang.NullPointerException: while trying to invoke the method org.springframework.messaging.SubscribableChannel.send(org.springframework.messaging.Message) of a null object returned from org.springframework.cloud.stream.binder.test.InputDestination.getChannelByName(java.lang.String) at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89) at com.successfactors.caf.listener.FunctionDemoTest.raePdrResultProcessor(FunctionDemoTest.java:82)
-
Error with StreamIdentifier when using MultiStreamTracker in kinesis
I'm getting an error with StreamIdentifier when trying to use MultiStreamTracker in a kinesis consumer application.
java.lang.IllegalArgumentException: Unable to deserialize StreamIdentifier from first-stream-name
What is causing this error? I can't find a good example of using the tracker with kinesis.
The stream name works when using a consumer with a single stream so I'm not sure what is happening. It looks like the consumer is trying to parse the
accountId
andstreamCreationEpoch
. But when I create the identifiers I am using thesingleStreamInstance
method. Is the stream name required to have these values? They appear to be optional from the code.This test is part of a complete example on github.
package kinesis.localstack.example; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.polling.PollingConfig; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH; import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB; import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON; import static software.amazon.kinesis.common.StreamIdentifier.singleStreamInstance; @Testcontainers public class KinesisMultiStreamTest { static class TestProcessorFactory implements ShardRecordProcessorFactory { private final TestKinesisRecordService service; public TestProcessorFactory(TestKinesisRecordService service) { this.service = service; } @Override public ShardRecordProcessor shardRecordProcessor() { throw new UnsupportedOperationException("must have streamIdentifier"); } public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { return new TestRecordProcessor(service, streamIdentifier); } } static class TestRecordProcessor implements ShardRecordProcessor { public final TestKinesisRecordService service; public final StreamIdentifier streamIdentifier; public TestRecordProcessor(TestKinesisRecordService service, StreamIdentifier streamIdentifier) { this.service = service; this.streamIdentifier = streamIdentifier; } @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { service.addRecord(streamIdentifier, processRecordsInput); } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (Exception e) { throw new IllegalStateException(e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { } } static class TestKinesisRecordService { private List<ProcessRecordsInput> firstStreamRecords = Collections.synchronizedList(new ArrayList<>()); private List<ProcessRecordsInput> secondStreamRecords = Collections.synchronizedList(new ArrayList<>()); public void addRecord(StreamIdentifier streamIdentifier, ProcessRecordsInput processRecordsInput) { if(streamIdentifier.streamName().contains(firstStreamName)) { firstStreamRecords.add(processRecordsInput); } else if(streamIdentifier.streamName().contains(secondStreamName)) { secondStreamRecords.add(processRecordsInput); } else { throw new IllegalStateException("no list for stream " + streamIdentifier); } } public List<ProcessRecordsInput> getFirstStreamRecords() { return Collections.unmodifiableList(firstStreamRecords); } public List<ProcessRecordsInput> getSecondStreamRecords() { return Collections.unmodifiableList(secondStreamRecords); } } public static final String firstStreamName = "first-stream-name"; public static final String secondStreamName = "second-stream-name"; public static final String partitionKey = "partition-key"; DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:latest"); @Container public LocalStackContainer localstack = new LocalStackContainer(localstackImage) .withServices(KINESIS, CLOUDWATCH) .withEnv("KINESIS_INITIALIZE_STREAMS", firstStreamName + ":1," + secondStreamName + ":1"); public Scheduler scheduler; public TestKinesisRecordService service = new TestKinesisRecordService(); public KinesisProducer producer; @BeforeEach void setup() { KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder().endpointOverride(localstack.getEndpointOverride(KINESIS)).region(Region.of(localstack.getRegion())) ); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(DYNAMODB)).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(CLOUDWATCH)).build(); MultiStreamTracker tracker = new MultiStreamTracker() { private List<StreamConfig> configs = List.of( new StreamConfig(singleStreamInstance(firstStreamName), InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON)), new StreamConfig(singleStreamInstance(secondStreamName), InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))); @Override public List<StreamConfig> streamConfigList() { return configs; } @Override public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { return new NoLeaseDeletionStrategy(); } }; ConfigsBuilder configsBuilder = new ConfigsBuilder(tracker, "KinesisPratTest", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestProcessorFactory(service)); scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig().callProcessRecordsEvenForEmptyRecordList(false), configsBuilder.retrievalConfig() ); new Thread(scheduler).start(); producer = producer(); } @AfterEach public void teardown() throws ExecutionException, InterruptedException, TimeoutException { producer.destroy(); Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown(); gracefulShutdownFuture.get(60, TimeUnit.SECONDS); } public KinesisProducer producer() { var configuration = new KinesisProducerConfiguration() .setVerifyCertificate(false) .setCredentialsProvider(localstack.getDefaultCredentialsProvider()) .setMetricsCredentialsProvider(localstack.getDefaultCredentialsProvider()) .setRegion(localstack.getRegion()) .setCloudwatchEndpoint(localstack.getEndpointOverride(CLOUDWATCH).getHost()) .setCloudwatchPort(localstack.getEndpointOverride(CLOUDWATCH).getPort()) .setKinesisEndpoint(localstack.getEndpointOverride(KINESIS).getHost()) .setKinesisPort(localstack.getEndpointOverride(KINESIS).getPort()); return new KinesisProducer(configuration); } @Test void testFirstStream() { String expected = "Hello"; producer.addUserRecord(firstStreamName, partitionKey, ByteBuffer.wrap(expected.getBytes(StandardCharsets.UTF_8))); var result = await().timeout(600, TimeUnit.SECONDS) .until(() -> service.getFirstStreamRecords().stream() .flatMap(r -> r.records().stream()) .map(KinesisClientRecord::data) .map(r -> StandardCharsets.UTF_8.decode(r).toString()) .collect(toList()), records -> records.size() > 0); assertThat(result).anyMatch(r -> r.equals(expected)); } @Test void testSecondStream() { String expected = "Hello"; producer.addUserRecord(secondStreamName, partitionKey, ByteBuffer.wrap(expected.getBytes(StandardCharsets.UTF_8))); var result = await().timeout(600, TimeUnit.SECONDS) .until(() -> service.getSecondStreamRecords().stream() .flatMap(r -> r.records().stream()) .map(KinesisClientRecord::data) .map(r -> StandardCharsets.UTF_8.decode(r).toString()) .collect(toList()), records -> records.size() > 0); assertThat(result).anyMatch(r -> r.equals(expected)); } }
Here is the error I am getting.
[Thread-9] ERROR software.amazon.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds! java.lang.IllegalArgumentException: Unable to deserialize StreamIdentifier from first-stream-name at software.amazon.kinesis.common.StreamIdentifier.multiStreamInstance(StreamIdentifier.java:75) at software.amazon.kinesis.coordinator.Scheduler.getStreamIdentifier(Scheduler.java:1001) at software.amazon.kinesis.coordinator.Scheduler.buildConsumer(Scheduler.java:917) at software.amazon.kinesis.coordinator.Scheduler.createOrGetShardConsumer(Scheduler.java:899) at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop(Scheduler.java:419) at software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:330) at java.base/java.lang.Thread.run(Thread.java:829)
-
AWS IAM Cross-Account Roles
I'm trying to implement a AWS MultiStreamTracker to consume data from 2 different data streams. This consumer will have only one Kinesis client. The two data streams are in different AWS accounts. Is there a way to create an IAM role with just one consumer but 2 data streams on 2 different accounts?
More on MultiStreamTracker here - https://docs.amazonaws.cn/en_us/streams/latest/dev/shared-throughput-kcl-consumers.html
-
Kinesis + Spark Streaming giving empty records
I am trying to read kinesis data stream with spark streaming. I am not getting any records in the output. My code is not giving any error, just that it does not print anything on the console even after feeding data to kinesis. I have also tried playing around with trim horizon and latest but still no luck. Please find the code below:
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream import time from pyspark import StorageLevel appName="cdc_application" sc = SparkContext(appName=appName) ssc = StreamingContext(sc, 1) streamName = 'cdc-stream' endpointUrl = 'https://kinesis.ap-south-1.amazonaws.com' regionName = 'ap-south-1' checkpointInterval = 5 kinesisstream = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, checkpointInterval,StorageLevel.MEMORY_AND_DISK_2 ) kinesisstream.pprint() ssc.start() time.sleep(10) # Run stream for 10 minutes just in case no detection of producer ssc.stop(stopSparkContext=True,stopGraceFully=True)
Version - Spark 2.4.8 on EMR Package used - org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.8
More details: My Kinesis stream is the target of AWS DMS which has been connected to the postgreSQL database. I am getting the change records(CDC) in Kinesis stream which i am trying to process through spark on EMR.
My output for this looks like below:
Time: 2022-05-05 05:19:33
Time: 2022-05-05 05:19:34
Time: 2022-05-05 05:19:35
Time: 2022-05-05 05:19:36
Time: 2022-05-05 05:19:37
Time: 2022-05-05 05:19:38
In console, it gives following output, but does not show anything:
Time: 2022-05-05 07:02:26
22/05/05 07:02:26 INFO JobScheduler: Finished job streaming job 1651734146000 ms.0 from job set of time 1651734146000 ms 22/05/05 07:02:26 INFO JobScheduler: Total delay: 0.014 s for time 1651734146000 ms (execution: 0.003 s) 22/05/05 07:02:26 INFO KinesisBackedBlockRDD: Removing RDD 844 from persistence list 22/05/05 07:02:26 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[844] at createStream at NativeMethodAccessorImpl.java:0 of time 1651734146000 ms 22/05/05 07:02:26 INFO ReceivedBlockTracker: Deleting batches: 1651734144000 ms 22/05/05 07:02:26 INFO InputInfoTracker: remove old batch metadata: 1651734144000 ms 22/05/05 07:02:27 INFO JobScheduler: Added jobs for time 1651734147000 ms 22/05/05 07:02:27 INFO JobScheduler: Starting job streaming job 1651734147000 ms.0 from job set of time 1651734147000 ms
Time: 2022-05-05 07:02:27
-
Spring Integration to pickup only based from the filename specified by a trigger file
Using spring integration, I want to pickup only the files based from the filename specified by a trigger file.
E.g. batch01.tar will be picked up if .batch01-trigger is present then update the filename of the trigger depending on the processing outcome (.batch01-trigger.success or .batch01-trigger.failed)
I am polling S3 bucket at the moment but I couldn't apply the above use case.
Thanks!
I used AWS SDK and spring-integration-aws.
-
Spring Integration: Inbound File Adapter drops files when service restarts
We're using the
S3InboundFileSynchronizingMessageSource
feature of Spring Integration to locally sync and then send messages for any files retrieved from an S3 bucket.Before syncing, we apply a couple of
S3PersistentAcceptOnceFileListFilter
filters (to check the file's TimeModified and Hash/ETag) to make sure we only sync "new" files.Note: We use the
JdbcMetadataStore
table to persist the record of the files that have previously made it through the filters (using a differentREGION
for each filter).Finally, for the
S3InboundFileSynchronizingMessageSource
local filter, we have aS3PersistentAcceptOnceFileListFilter
FileSystemPersistentAcceptOnceFileListFilter
-- again on TimeModified and again persisted but in a different region.The issue is: if the service is restarted after the file has made it through the 1st filter but before the message source successfully sent the message along, we essentially drop the file and never actually process it.
What are we doing wrong? How can we avoid this "dropped file" issue?