why webFlux is too slow?
when in my project,my boss require us to use webflux as server, but we only run sync code,to my disappionted, any request will take 2~10 seconds to response, in jvisualvm, I find the reactor-tcp-epoll is too busy,the http-nio-7777-exec is ok, so ,I was confused, any one know the correct case ?
do you know?
how many words do you know
See also questions close to this topic
-
How can I use functional update with event.currentTarget.value in React?
First, please check my code.
const [name, setName] = useState('nick'); const handleChangeName = (e) => { setName(prevState => e.currentTarget.value) } return ( <input value={name} onChange={handleChangeName} /> )
I'm trying to do functional update not
setName(e.currentTarget.value)
However, with this code,
const handleChangeName = (e) => { setName(prevState => e.currentTarget.value) }
I am not getting the right value for some reason. If you know what is the problem or the answer, please let me know! Thank you.
-
Generic Events Emitted from Component in Angular ~13
I'm trying to determine if I'm going down an undesirable path with how I'm approaching generic event management in an Angular application.
The question I have is: Is there a more built-in or best-practices-conforming way to achieve the following.
Given a simple type, representing an item:
export interface Item { name: string; }
I want to create a very generic item list component, that supports emitting actions dictated by the containing component. I've created an
Action
type representing the types of actions that can be emitted, and anActionEvent<TPayload>
type representing the emitted action event:export interface Action { name: string; } export interface ActionEvent<TPayload> { name: string; payload: TPayload; }
The generic item list component is then defined as such:
@Component({ selector: "list-component", template: ` <table> <tbody> <tr *ngFor="let item of this.items"> <td>{{item.name}}</td> <td> <button *ngFor="let action of this.actions" (click)="this.doAction(action.name, item)"> <span>{{action.name}}</span> </button> </td> </tr> </tbody> </table> ` }) export class ListComponent { @Input() items: Item[] = []; @Input() actions: Action[] = []; @Output() onActionEvent: EventEmitter<ActionEvent<Item>> = new EventEmitter<ActionEvent<Item>>(); doAction(name: string, payload: Item) { this.onActionEvent.emit({name, payload}) } }
It takes an
@Input
ofitems: Item[]
andactions: Actions[]
; the items to be displayed, and the supported actions of those items respectively.It also defines an
@Output
ofEventEmitter<ActionEvent<Item>>
to emit events for the items.The container component then, could be defined as such:
@Component({ selector: "container-component", template: ` <h1>Items</h1> <list-component [items]="this.items" [actions]="this.actions" (onActionEvent)="this.handleActionEvent($event)"> </list-component> ` }) export class ContainerComponent { items: Item[] = [ {name: "foo"}, {name: "bar"}, {name: "qux"}, ]; actions: Action[] = [ {name: "view"}, {name: "edit"}, ]; handleActionEvent(actionEvent: ActionEvent<Item>) { console.log(actionEvent); } }
This then resembles something such as:
Now, varying containers can define varying actions; some may support
view
andedit
, others may support different actions. I could easily add ashare
action (and the corresponding handler code, which could itself be wrapped up in theAction
type definition) and have something such as:Where I'm performing
console.log(actionEvent)
, the container's handler would be responsible for determining what meaningful behavior to perform onview
oredit
or whatever else.So, as stated at the top, am I painting myself into an undesirable corner with this approach? I'm trying to stay very DRY, but I feel like I'm straying away from KISS. Additionally, I'm as yet unaware of a built-in or more best-practices-conforming way to accomplish this.
-
How to get mouse event in terminal?
iam making terminal game, how to print mouse position in (windows) terminal window on mouse_button_1 click (no on the whole screen only in window)?
-
Why so many Debug Logs with "setHandshakeSuccess" in vertx / quarkus?
we are evaluating quarkus and deployed one prototype with the log level DEBUG and just let it run for a few days. Our Logstash collapsed because of a
java.lang.ArithmeticException
and while investigating we saw so manysetHandshakeSuccess
logs from the new quarkus-web-service.
To be exact: 2600 logs every 5 minutes.No one even uses the web service. It should be running idle and not produce so many logs.
Can anyone tell us whether this is normal behavior?Example Log:
"message": "[id: 0x9869d583, L:/XX.XXX.XXX.XXX:8181 - R:/XX.XXX.X.XXX:49892] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_256_GCM_SHA384" "SourceClassName": "io.netty.handler.ssl.SslHandler", "Thread": "vert.x-eventloop-thread-1",
The port number at the end of the second ip varies but there is no network traffic.
Does this message only mean that quarkus is checking ALL OF the available ports for traffic?quarkus.platform.version = 2.8.1.Final
-
CorruptedFrameException while using websocket
we are using web socket and I guess it is successful because I can see the handshake messages and connection logs but I am getting below warning continuously, not sure what I am missing while establishing web socket connection
io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.handler.codec.CorruptedFrameException: data frame using reserved opcode 7 at io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder.protocolViolation(WebSocket08FrameDecoder.java:412)
I guess getting this warning at the time of flushing the message -
channelPipeline.addLast("flush", new FlushHandler(256, true));
How can I avoid this exception? or How can I handle this exception?
-
Java Spring Webflux on Kubernetes: always [or-http-epoll-1], [or-http-epoll-2], [or-http-epoll-3], [or-http-epoll-4] despite configured resource
Small question regarding a Java 11 Spring Webflux 2.6.6+ web app, containerized and deployed using Kubernetes please.
From the web app application logs, I am seeing things such as:
INFO [service,1bcce5941c742568,22c0ab2133c63a77] 11 --- [or-http-epoll-2] a.b.c.SomeClass : Some message from the reactive pipeline. INFO [service,67cb40974712b3f4,15285d01bce9dfd5] 11 --- [or-http-epoll-4] a.b.c.SomeClass : Some message from the reactive pipeline. INFO [service,5011dc5e09de30b7,f58687695bda20f2] 11 --- [or-http-epoll-3] a.b.c.SomeClass : Some message from the reactive pipeline. INFO [service,8046bdde07b13261,5c30a56a4a603f4d] 11 --- [or-http-epoll-1] a.b.c.SomeClass : Some message from the reactive pipeline.
And always, I can only see
[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]
which I think stands for:[reactor-http-epoll-N]
The problem is, no matter how much CPU I allocate from Kubernetes, it is always those 4, no less, no more.
I tried:
resources: requests: cpu: 1 memory: 1G limits: cpu: 2 memory: 2G resources: requests: cpu: 4 memory: 4G limits: cpu: 6 memory: 6G resources: requests: cpu: 10 memory: 10G limits: cpu: 10 memory: 10G
But again, always only those 4.
I am having a hard time understanding what is the problem here, and why am I stuck with only/always 4 "or-http-epoll-".
Thank you
-
Handling exception with Mono flow
I have a WebFlux handler as below.
@Transactional public Mono<ServerResponse> submitOrder(final ServerRequest request) { return context.retrieveUser().flatMap(usr -> { try { return Mono.zip(branchSetting, labOrderDetail, labOrderTests).flatMap(response -> { final Mono<String> submitOrderMono = service.submitOrder(usr); final Mono<Integer> updateStatusMono = orderRepository.updateStatus(orderId); return Mono.zip(submitOrderMono, updateStatusMono).flatMap(submitResponse -> { return ok().bodyValue(submitResponse.getT1()); }).onErrorResume(e -> { if (e instanceof ServiceException) { ServiceException ex = (ServiceException) e; return status(ex.getStatusCode()).bodyValue(e.getMessage()); } else { return status(500).bodyValue(e.getMessage()); } }); }); } catch (Throwable e) { if (e instanceof ServiceException) { ServiceException ex = (ServiceException) e; return status(ex.getStatusCode()).bodyValue(e.getMessage()); } else { return status(500).bodyValue(e.getMessage()); } } }); }
submitOrder method from service class,
public Mono<String> submitOrder(final Order order, if (order.getPatientId() != null) { throw new ServiceException("Missing Id for patient !!!", HttpStatus.BAD_REQUEST.value()); } }
Here, I am doing some validation and throwing Exception.
But, this exception is not getting into onErrorResume or catch block in the calling main method and hence the service caller sees 500 error.
Not sure what is wrong here.
-
Spring Webflux - initial message without subscriber
I am trying to make an SSE Spring application, using Webflux. According to the documentation, the message is not sent to the sink if there is no subscriber. In my use case, I would like that the subscriber would receive the last message when calling for subscription. I have found that Sink can be configured in following way:
Sinks.many().replay().latest();
And when I have both publisher and subscriber, and the next subscriber calls for subscription, he receives the last sent message, which is great. However if I don't have any subscribers, publisher sends the message and then first subscriber comes in, it receives none. Which is just as documentation above says actually, but I am thinking how to solve that issue to meet my needs. As a workaround I did something like this:
if (shareSinks.currentSubscriberCount() == 0) { shareSinks.asFlux().subscribe(); } shareSinks.tryEmitNext(shareDTO);
But subscribing the publisher to its own subscription doesn't sound like a clean way to do this...
-
Does java nio read/write file actually non-blocking io?
I'm learning about non-blocking i/o terms. Mainly, I'm learning java Nio. I'm trying to understand non-blocking i/o better and observe how it works in the implementation with Java Nio.
I read a dozen questions and answers related to the non-blocking i/o term and the non-blocking i/o by using java Nio.
I see a following similar statement everywhere.
Non-blocking IO does not wait for the data to be read or written before returning. Java NIO non-blocking mode allows the thread to request writing data to a channel, but Non-blocking IO does not wait for the data to be read or written before returning.
I tried to illustrate the statement with an example java code that reads a file's content using Nio. However, the thread is still blocked at the channel's read() method.
System.out.println("Thread-" + Thread.currentThread().getName() + "-" + Thread.currentThread().getId());// Thread-main-1 String filePath = "./resources/nio-demo.txt"; FileInputStream fis = new FileInputStream(new File(filePath)); FileChannel fileChannel = fis.getChannel(); ByteBuffer buf = ByteBuffer.allocate(102400); int bytesRead = fileChannel.read(buf); System.out.println(buf.position()); //check how many bytes are written to the buf, always 102400 while (bytesRead != -1) { buf.flip(); while (buf.hasRemaining()) { System.out.print((char) buf.get()); } buf.clear(); bytesRead = fileChannel.read(buf); } fis.close();
As I understand, after triggering the read() method, the thread will execute the following line of code. It doesn't matter if the buf is full of data or not. But here, the thread is blocked until the buf is complete and continues the process sequence.
The above example reads a file about 300M with buf is 102400.
The situation confused me much; perhaps I misunderstood non-blocking i/o with java Nio.
Could you please help me explain this situation?
How does java Nio achieve purely non-blocking i/o with only one thread?
-
Selecting a particular Column in a CSV-file Dynamically
I have this CSV file:
id,name,mark 20203923380,Lisa Hatfield,62 20200705173,Jessica Johnson,59 20205415333,Adam Harper,41 20203326467,Logan Nolan,77
And I'm trying to process it with this code:
try (Stream<String> stream = Files.lines(Paths.get(String.valueOf(csvPath)))) { DoubleSummaryStatistics statistics = stream .map(s -> s.split(",")[index]).skip(1) .mapToDouble(Double::valueOf) .summaryStatistics(); } catch (IOException e) // more code
I want to get the column by its name.
I guess I need to validate the index to be the index of the column the user enters as an integer, like this:
int index = Arrays.stream(stream).indexOf(columnNS);
But it doesn't work.
The stream is supposed to have the following values, for example:
Column:
"mark"
62, 59, 41, 77
-
How to buffer and group elements in Reactor Flux in Java
Given an infinite flux of objects, where each object has an ID, how can I use flux to create a buffered list of updates grouped by ID property (keeping the last emitted value)? Thanks
Example
Obj(ID=A, V=1) Obj(ID=A, V=2) Obj(ID=B, V=3) --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)] Obj(ID=A, V=1) Obj(ID=B, V=4) Obj(ID=B, V=6) Obj(ID=A, V=2) --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)] Obj(ID=B, V=1) --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
Something like the following would be perfect but it seems to wait the end of the flux in my tests instead of buffering.
flux .buffer(Duration.ofMillis(2000)) .groupBy(Obj::getId) .flatMap(GroupedFlux::getLast) .collectToList() .subscribe(this::printList);
It works with buffer and custom logic for grouping
public static void main(String[] args) { flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList); } private void groupList(List<T> ts) { Collection<T> values = ts.stream() .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v)) .values(); System.out.println(values); }
-
Project Reactor onErrorResume get stuck
I'm using project reactor and I have a very long flow in which I get an exception (when parsing a string to json with Jackson). The thing is that even though I use
.map(this::parser) .onErrorResume(err -> { log.error(myMsg); return Mono.empty(); }) .flatMap(writeToPulsar) .subscribe()
The flow won't continue. I do see the error log and the flow doesn't throw an exception, but the flow won't continue to get executed. Any reason for this to happen?
When I change the code to the (unwanted)
.onErrorContinue()
, the data pipeline won't get stopped:.map(this::parser) .onErrorContinue((err, msg) -> { log.error(myMsg); }) .flatMap(writeToPulsar) .subscribe()
-
Correct way of returning an object from an async operation to continue the Flux
I'd like to perform an async operation e.g. writing to a database, and after that continue the Flux with the same object that the database write function has received. The thing is that the response from the database transforms the incoming POJO of mine into a response object, but I need the incoming object also for the next function (e.g.
writeToKafka
). Thus my workaround is to perform the following operation:@Override public void run(ApplicationArguments args) { Flux.from(KafkaReceiver.create(receiverOptions) .receive() .flatMap(this::writeToS3) .flatMap(this::writeToKafka) ).subscribe(); } private Publisher<Message<ExampleData>> writeToS3(ExampleData message) { return Mono.fromFuture(s3Write()) .doOnError(e -> log.error(e.getMessage())) .then(Mono.just(message)); // <<<<------------ This is the workaround }
I'd like to know how this trick affects the upstream and if this is the considered best practice for such use cases.