Differential or full data event on a topic
Let us assume that there is a product topic based on which clients (of the topic) create their own local product databases, for their needs. This topic contains a public subset of all product attributes.
Which is better, to always send full product data on change (be it update, create, delete) or just the differential?
What is the best way to initialize the local databases and how to deal with extending the public attributes set, in case some clients need additional attributes?
See also questions close to this topic
-
C# assign an event to a "group" of controls?
I have a big numbers of labels custom controls let's say 100. I would like to give them an MouseHover event. I could do something like:
private void label_custom1_MouseHover(object sender, EventArgs e) { TextBox.Text = label_custom1.backcolor.ToString(); }
But then I would need to do that 100 times. Since I have 100 of them. Is there a way to do that only once? I guess I should probably declare the function in my custom_label class but so far I couldn't make it work. Any Idea how to proceed?
-
How to avoid EventEmitter type error in TypeScript?
In a typescript source
src/index.ts
, usingEventEmitter
. When run typecheckyarn workspace my_project typecheck
Got error
yarn workspace v1.22.10 yarn run v1.22.10 $ tsc --noEmit src/index.ts:28:4 - error TS2709: Cannot use namespace 'EventEmitter' as a type. 28 ): EventEmitter { ~~~~~~~~~~~~ src/index.ts:30:24 - error TS2351: This expression is not constructable. Type 'typeof EventEmitter' has no construct signatures. 30 const eventHub = new EventEmitter() ~~~~~~~~~~~~ src/index.ts:102:13 - error TS2709: Cannot use namespace 'EventEmitter' as a type. 102 eventHub: EventEmitter ~~~~~~~~~~~~ Found 3 errors. error Command failed with exit code 1. info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command. error Command failed. Exit code: 1 Command: /Users/user/.nvm/versions/node/v14.15.0/bin/node Arguments: /Users/user/.nvm/versions/node/v14.15.0/lib/node_modules/yarn/lib/cli.js typecheck Directory: /Users/user/my_project Output: info Visit https://yarnpkg.com/en/docs/cli/workspace for documentation about this command.
The source near 28, 30 and 102 lines like
import EventEmitter from "events" //... function buildEventHub( taskId: number, jobs: MyJob[], pool: Pool ): EventEmitter { const jobIds = jobs.map(({ id }) => id) const eventHub = new EventEmitter() //... async function send( taskId: number, jobs: MyJob[], eventHub: EventEmitter ): Promise<void> { const body: RawRequestParams = { taskId, urls: jobs.map(({ canonicalUrl }) => canonicalUrl) }
Why the error occurred?
- node version: v8.15.0
- typescript version: ^3.5.3
-
matplotlib event "motion_notify_event" axes doesn't match with corresponded fig.axes
i am using ipywidgets with matplotlib to make an interactive jupyter Notebook relevant imports:
%matplotlib widget import ipywidgets as widgets
visualization onf graph
i am trying to add hoover information over the bars, for such thing i use the following functions (only show relevant):fig3out = widgets.Output() with fig3out: fig3 = mp.pyplot.figure() fig3.canvas.mpl_connect("motion_notify_event", mousehoverbar) @cuadro4capturas.capture() def update_annot(patch, text): pos = patch.get_bbox().bounds x = pos[0]+pos[2]/2 y = pos[3] text.set_position([x, y]) @cuadro4capturas.capture() def mousehoverbar(event): text = fig3.texts[1] vis = text.get_visible() print(event.inaxes) print(fig3.get_axes()[-2]) if (event.inaxes == fig3.get_axes()[-2]): print('its inside') for patch in fig3.axes[-2].patches: cont, dicc = patch.contains(event) if cont: update_annot(patch, text) text.set_visible(True) else: if vis: text.set_visible(False) display(cuadro4capturas) [Out]: AxesSubplot(0.606034,0.11;0.293966x0.456296) AxesSubplot(0.606034,0.11;0.293966x0.456296)
the text object is pre-created with the figure, i just hide it and change its position
fig3.text(0, 0, 'some text', horizontalalignment='center',verticalalignment='center', transform= ax2.transData, color='white', visible=False, size='large', bbox=dict(facecolor='black', alpha=0.5, boxstyle="round"))
i have two problems
- for some reason, the the code is not going inside the if(event.inaxes == fig 3.get_axes()[-2]), even though is pointing to the same object (you can see it in the display output)
- if i remove the if, it works as spected, but the text that i'm trying to show, blink all the time, and turn off, even when i am inside one bar, and maybe after moving it a little the text is visible again
-
Quarkus Kafka - Batch/Bulk message consumer
I want to batch process. In my use case send kafka producer messages are sent one by one. I want to read them as a list in the consumer application. I can do that at the Spring Kafka library. Spring Kafka batch listener
Is there any way to do this with the quarkus-smallrye-reactive-messaging-kafka library?
I tried the example below but got an error.
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-3) SRMSG00200: The method org.MyConsumer#aggregate has thrown an exception: java.lang.ClassCastException: class org.TestConsumer cannot be cast to class io.smallrye.mutiny.Multi (org.TestConsumer is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @6f2c0754; io.smallrye.mutiny.Multi is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4c1638b)
application.properties:
kafka.bootstrap.servers=hosts mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest mp.messaging.incoming.test-consumer.connector=smallrye-kafka mp.messaging.incoming.test-consumer.value.deserializer=org.TestConsumerDeserializer
TestConsumerDeserializer:
public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer>{ public TestConsumerDeserializer(){ // pass the class to the parent. super(TestConsumer.class); } }
MyConsumer:
@ApplicationScoped public class MyConsumer { @Incoming("test-consumer") //@Outgoing("aggregated-channel") public void aggregate(Multi<Message<TestConsumer>> in) { System.out.println(in); } }
-
Override schemas.enable in kafka-connect connector
I have a distributed Kafka Connect (6.x) cluster with multiple connectors installed. Key and value converters are set to
org.apache.kafka.connect.json.JsonConverter
. For one of the connectors (an S3 Sink) I need to consume events without a schema, so the schemas.enable properties are set to false in the cluster configs:key.converter.schemas.enable=False
,value.converter.schemas.enable=False
. For others (Debezium MySql source connectors) I need to add the schema in the event, so the properties should be true.I know that it's possible to override configs in the connectors, but apparently this only works for
producer.
andconsumer.
configs. I tried overridingproducer.key.converter.schemas.enable
in the source connectors, but it doesn't seem to have effect.Is there a way to achieve this within a single Kafka Connect cluster?
-
How to get logical types from schema registry to avro files using Kafka GSC Connector
I'm loading avro files into GCS using Kafka GCS connector. In my schema in the schema registry I have logical types on some of my columns, but it seems like they're not being transferred to the files. How can logical types from a schema be transferred to avro files? Here is my connector configuration for what it's worth:
{ "connector.class": "io.confluent.connect.gcs.GcsSinkConnector", "confluent.topic.bootstrap.servers": "kafka.internal:9092", "flush.size": "200000", "tasks.max": "300", "topics": "prod_ny, prod_vr", "group.id": "gcs_sink_connect", "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy", "gcs.credentials.json": "---", "confluent.license: "---", "value.converter.schema.registry.url": "http://p-og.prod:8081", "gcs.bucket.name": "kafka_load", "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat", "gcs.part.size": "5242880", "confluent.topic.replication.factor": "1", "name": "gcs_sink_prod", "value.converter": "io.confluent.connect.avro.AvroConverter", "storage.class": "io.confluent.connect.gcs.storage.GcsStorage", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "auto.offset.reset": "latest" }
-
Event Store - Filter out any deleted streams from projections
I have deleted some streams but I'm still getting those streams when my projection starts.
Is there a way to filter out any deleted streams from projections?
I know that the streams have been deleted because a
StreamDeletedException
exception is being raised when I use code like this:var events = _eventStoreConnection.ReadStreamAsync( Direction.Forwards, e.Event.EventStreamId, StreamPosition.Start); await foreach (var @event in events) { }
-
How to read events stored in Axon Event Store?
I am struggling to understand how to connect to a default Axon Event Store and see what events are stored there? Is there a way to preview it somehow?
How do I replay stored events to re-create a particular state of a stored Entity? I am trying to find an example, tutorial, or a video lesson on the internet and cannot find anything 😐... How do I make a snapshot and then retrieve it? Cannot find any online example on how to do it... Can someone advise or share a link to a discussion here on StackOverflow if this has been asked before?
Thank you 🙏
-
How to achieve data consistency in a newly added microservice?
For example, we have microservices with event sourcing. To achieve data consistency we use the following approach:
- A microservice generates an event
- The event is stored in an event store
- The event is published to the subscribed microservices
This approach works fine with microservices that are already in use. But what if I need to deploy another microservice that needs to synchronize data with the event store? Obviously, this new microservice missed all the published events.
Is this new microservice supposed to pull events from the event store by itself?
-
IPC microservices AMQP and resilience
I am creating an architecture for my microservices plateform running over Kubernetes. My current architecture looks like:
Description:
- I use Flask to create an API RESTful.
- For my IPC mechanism and Event Driven, I use RabbitMQ.
- The microservice contains code for calling a Producer and Consumer RabbitMQ.
- When the Flask app is started, a consumer is instancied whithin a child process (with multiprocessing library). The Process is not joined, and not killed during all living state of main app (flask).
- The producer is instancied only when a request (POST/PUT) is called.
And I was wondering, what if my RabbitMQ crashed :
- The consumer in the microservice will not longer live because it will raise an exception for connecting to rabbitMQ
- The microservice API (FLASK) will continue to live
So my question is the following:
Is it a good practice to seperate the consumer process into a independant container ?-> The container will run along side the main app in the same pod.
-> The sidecar consumer will have a liveness endpoint, so if RabbitMQ Crashed again, Kubernetes will start only this container.
-> The sidecar consumer will have access to database to write events.
-> The producer can stay in the main app (flask), more resilient for me. -
AWS event driven approach - Cloud Watch vs S3 event notification
I am building an event-driven system, which starts as soon as a new file lands S3. I am evaluating different ways of achieving that and using Cloud Watch Rule + API Trail is an option. This is the Cloud Watch Event pattern as it is:
{ "source": [ "aws.s3" ], "detail-type": [ "AWS API Call via CloudTrail" ], "detail": { "eventSource": [ "s3.amazonaws.com" ], "eventName": [ "PutObject" ], "requestParameters": { "bucketName": [ "mysupertest88" ] } } }
Like that, it triggers the rule for every file landing into the bucket but trying to filter by key and wildcard does not work:
"requestParameters": { "bucketName": [ "mysupertest88" ], "key": ["myprefix/mysecondprefix/*"] }
It works just if I specify a key with matches without a wildcard, I think because the symbol '*' is a valid char in S3 objects. An alternative is to filter directly at Trail level:
but I do not see that as a nice option, as API Trail is often out of developer's control.
Is then S3 even notification the best way to accomplish this? What is your experience with that?
-
how do you handle client request in event driven architecture?
I have been researching event driven architecture and I see the benefits of it.. the one thing I fail to understand is how to use this architecture to handle responses to client requests. imagine a scenario where a the client makes a request and that request either
- has to store user data, generate a new piece of data and send back to client in one/same request
- a request that spans over multiple microservices and the result must be sent back to the client
Im thinking communication between frontend & backend via web sockets; maybe when we send a request we open a socket and wait for a response; once a response has arrived, close the socket... or do you apply REST for handling requests/responses but the server does everything else with events
if you know of any books or talks that cover this concern specifically please share