Pub/Sub message triggered from Cloud run hosted application is taking long time
I have a simple Spring boot application hosted in Google cloud run which publishes a Google Pub/Sub message to a topic in the same project. This is taking a long time for about 5 min roughly. Below is the code that I use to publish the Google pub/sub message. But the same was working fine with no delay in Appengine environment.
ApiFuture<String> messageIdFuture = com.google.cloud.pubsub.v1.Publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
log.error("APIException Status Code: {}", apiException.getStatusCode().getCode());
log.error("APIException is Retryable: {}", apiException.isRetryable());
}
log.error("Error publishing message: {}", pubSubMsg);
}
@Override
public void onSuccess(String messageId) {
log.info("Success msg after publish: {}", messageId);
}
}, MoreExecutors.directExecutor());
How to overcome this delay in publishing the Pubsub message?
1 answer
-
answered 2020-11-25 14:56
Andrew
If I remember correctly Spring Boot creates a JAR file that contains nested JARs. In Cloud Run these files get unpacked during startup and this can impact on the run time.
Try using a Jib plugin to containerise your application and see if that improves your timings.
I imagine that you will also need to optimise how your app is containerised. Are you using a Multi-Stage Build?
I am not overly familiar with Spring Boot but this article discusses how to containerise your code for best performance.
See also questions close to this topic
- How to make Navigation drawer like this?
-
URL resource = this.getClass().getResource("file-name") is null
Completely puzzled. I do:
URL resource = this.getClass().getResource("eye-visible.png");
and the URL evaluates to null. "eye-visible.png" is a file in the same package as my class, which is "Controller.java":
I also tried (actually I started with this):
InputStream inputStream = Controller.class.getResourceAsStream("eye-visible.png");
and this returns null as well.
Really stuck - any clue??
This is a modular project and I run on the JDK14. I use NetBeans for an IDE. The lines above are in the start method of a JavaFX application. I reproduced the same null output with the lines above, in a new java project that has just a main class: same effect.
-
Enums Switch Statement for Type
public enum Black { BLACK,WHITE; private boolean BLACK() { boolean b = true; switch(this) { case BLACK: return true; case WHITE: return false; } return b; } }
For the above code, I am trying to check if my enum is "BLACK", or "WHITE". However, I tried debugging this and when I have an instance of "BLACK", in the switch statements, both cases evaluate to true. What am I doing wrong and how can I fix this?
-
Error in linking spring boot and postgreSQL(kotlin)
I want to pass a table from intellij to postgreSQL using spring boot. The language used is Kotlin and it is written in gradle.
Information arrives until postman, but not postgreSQL. Also, no error occurs, so I don't know the cause any more.
Below is my code and dependency.
application.properties
spring.datasource.url=jdbc:postgresql://localhost:5432/spring spring.datasource.username=postgres spring.datasource.password=1234
UserInfo.kt
import javax.persistence.* @Entity data class UserInfo( @Id val id: String, var pw: String, var name: String, var birth: String )
Usercontroller.kt
import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration import org.springframework.web.bind.annotation.* import javax.annotation.PostConstruct @RestController @EnableAutoConfiguration(exclude = [DataSourceAutoConfiguration::class]) class UserController { private val userMap: MutableMap<String, UserInfo> = mutableMapOf() @PostConstruct fun init() { userMap["id1"] = UserInfo("id1", "password1", "Alice", "1001") userMap["id2"] = UserInfo("id2", "password2", "Rabit", "1101") userMap["id3"] = UserInfo("id3", "password3", "Card", "1201") } @GetMapping(path = ["/user/{id}"]) fun getUserInfo(@PathVariable("id") id: String) = userMap[id] @GetMapping(path = ["user/all"]) fun getUserInfoAll() = ArrayList<UserInfo>(userMap.values) @PutMapping(path = ["/user/{id}"]) fun putUserInfo(@PathVariable("id") id: String, @RequestParam("pw") pw: String, @RequestParam("name") name: String, @RequestParam("birth") birth: String) { val userInfo = UserInfo(id,pw, name, birth) userMap[id] = userInfo } @PostMapping(path = ["/user/{id}"]) fun postUserInfo(@PathVariable("id") id: String, @RequestParam("pw") pw: String, @RequestParam("name") name: String, @RequestParam("birth") birth: String){ val userInfo = userMap[id] userInfo?.pw = pw userInfo?.name = name userInfo?.birth = birth } @DeleteMapping(path = ["/user/{id}"]) fun deleteUserInfo(@PathVariable("id") id: String) = userMap.remove(id) }
UserRepository.kt
import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository @Repository interface UserRepository : ReactiveCrudRepository<UserInfo, String
gradle is in the attachment
I think there may be a problem in UserRepository.kt, but I can't find it because there is no error.
Thank you for reading the article. I wish you a nice day.
-
Public URLs with Spring Cloud Gateway secure by UAA?
I found this tutorial to secure Spring Cloud Gateway. It works well, but I would like to allow access to unregistered users to specific URLs (let's say /public).
I modified the example as follow:
public class SecurityConfig { @Bean SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) throws Exception { http .authorizeExchange() .pathMatchers("/resource") .permitAll(); return http.build(); } }
But this doesn't do the trick as it appears the Gateway intercepts any traffic and redirects to the login page.
Is that normal and just the way UAA works or am I missing something here?
-
Unauthorized despite set environment variable - How to debug?
My Application has many endpoints and I want to secure two of them with basic authentication. Therefore I set both environment variables
SPRING_SECURITY_USER_NAME
andSPRING_SECURITY_USER_PASSWORD
and I log them to confirm that they are set. In my subclass toWebSecurityConfigAdapter
I have:protected void configure(HttpSecurity http) throws Exception { http.csrf().disable().authorizeRequests().antMatchers("/").permitAll() .antMatchers("/endpoint1").authenticated() .antMatchers("/endpoint2").authenticated() .and().httpBasic(); http.headers().frameOptions().disable(); }
I expect that all but two endpoints are accessible without a password, and that those two are accessible with the correct credentials. And if I run the app locally, my expectations are satisfied. However if I deploy the app to an external environment, the two endpoints return error code 401 unauthorized even with the "correct" credentials, which I can confirm via the log.
I deploy to a non production environment and the credentials are dummy values so I can pretty much log anything, but I don't know where to go from here. How can I determine the actual credentials and why they are used and not the ones I provide?
Additional information: my app is a multi module maven project. I use the same code as above in a single module project (
appB
) and that project works fine locally and deployed. -
Choosing between PubSub Backend Function or Http Function
We are designing out our listening side of Google PubSub and will be using Google Functions for this. There are two choices, push and a backend function. Push using Http and will push messages to the function. backend function uses triggering to invoke the function.
Where we are having trouble is deciding which approach may be better for our application. We have not been able to find a list of differences between these two methods within the Google documentation.
Some things we have noticed are that:
- Http function use an Expressjs structure where backend functions are only a function.
- It appears message retires is only available for backend functions.
- It isn't clear if and how dead letter queues work with backend functions.
- Are there differences between the number of messages per second which can be handled?
- So many more items we aren't thinking of to ask...
The question here is, what should be determined to help choose between an http function or a backend function for receiving PubSub messages?
-
Dataflow template "Pub/Sub Avro to Bigquery" fails to decode
I'm trying to stream data from Pub/Sub to Bigquery via the Dataflow template "Pub/Sub Avro to Bigquery". The data in Pub/Sub is in AVRO format and coming from a Kafka topic. The corresponding schema file I got from the schema registry. This is what it looks like:
{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"ID","type":["null","string"],"default":null},{"name":"TIMESTAMP","type":["null","string"],"default":null}]}
There is no newline in the saved schema.avsc and I'm getting this error in dataflow:
2021-01-22 10:31:28.231 MEZError message from worker: java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
When I'm consuming the messages in my topic manually, I'm able to decode with the exact same schema, however I need to take care of five extra bytes in front of my message. The original message out of Pub/Sub looks like this:
b'\x00\x00\x00\x00\x0c\x02\x1656173684800\x02:2021-01-22T10:21:40.384+01:00'
I suspect, that I need to change something to my schema.avsc file in order for Dataflow to handle the extra bytes correctly, but I'm not sure how and maybe not sure if it's even the right approach.
I hope someone can point me in the right direction, thanks in advance.
-
Unable to read Pub/Sub messages with Apache Beam (Python SDK)
I'm trying to stream messages from a Pub/Sub topic with the Beam programming framework (Python SDK) and write them out to the console.
This is my code:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions TOPIC_PATH = "projects/<project-id>/topics/<topic-id>" def run(pubsub_topic): options = PipelineOptions( streaming=True ) runner = 'DirectRunner' print("I reached before pipeline") with beam.Pipeline(runner, options=options) as pipeline: ( pipeline | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(topic=pubsub_topic) | "Writing to console" >> beam.Map(print) ) print("I reached after pipeline") result = pipeline.run() result.wait_until_finish() run(TOPIC_PATH)
When I execute this pipeline however, I get this TypeError:
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x1349763c0>, due to an exception. TypeError: create_subscription() takes from 1 to 2 positional arguments but 3 were given
In the end it says:
ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
I'm not sure, what I'm doing wrong, thanks in advance for your help.
-
Why doesn't my newest cloudrun revision receive traffic?
I am trying to deploy a Python (Gunicorn) application on CloudRun (fully managed). Most of the time the newest deployed revision receives 100% of traffic directly after I run the deploy command. However, from time to time, the deploy commands says the newest revision receives 0% of traffic (and when I try to reach the application, I am indeed redirected to an older version).
gcloud beta run deploy my-app \ --platform=managed \ --allow-unauthenticated \ --project my-project \ --region europe-west1 \ --port=80 \ --memory=1Gi \ --service-account my-app@my-project.iam.gserviceaccount.com \ --min-instances=1 \ --image=europe-west1-docker.pkg.dev/my-project/my-registry/my-app:latest Deploying container to Cloud Run service [my-app] in project [my-project] region [europe-west1] Service [my-app] revision [my-app-00044-zay] has been deployed and is serving 0 percent of traffic.
Why doesn't my revision receive traffic ?
-
Use terraform for google cloud run
Im looking into using terraform to automate setting up an environment for demos.
Works for VM instance and can be fully automated but management prefers to use Cloud run with Docker containers.
When I read this article it starts with manually having to build and register a docker container. I don't get that step, why can't that be automated as well with terraform?
-
Connectivity between Cloud Run and Cloud SQL (Internal IP)
I have created my organisation infrastructure in GCP following the Cloud Foundation Toolkit using the Terraform modules provided by Google.
The following table list the IP ranges for all environments:
Now I am in the process of deploying my application that consists of basically Cloud Run services and a Cloud SQL (Postgres) instance. The Cloud SQL instance was created with a private IP from the "unallocated" IP range that is reserved for peered services (such as Cloud SQL).
In order to establish connectivity between Cloud Run and Cloud SQL, I have also created the Serverless VPC Connector (ip range 10.1.0.16/28) and configured the Cloud SQL proxy.
When I try to connect to the database from the Cloud Run service I get this error after ~10s:
CloudSQL connection failed. Please see https://cloud.google.com/sql/docs/mysql/connect-run for additional details: Post "https://www.googleapis.com/sql/v1beta4/projects/[my-project]/instances/platform-db/createEphemeral?alt=json&prettyPrint=false": context deadline exceeded
I have granted
roles/vpcaccess.user
for both the default Cloud Run SA and the one used by the application in the host project.I have granted
roles/compute.networkUser
for both SAs in the service project. I also grantedroles/cloudsql.client
for both SAs.I have enabled
servicenetworking.googleapis.com
andvpcaccess.googleapis.com
in the service project.I have run out of ideas and I can't figure out what the issue is.
It seems like a timeout error when Cloud Run tries to create a POST request to the Cloud SQL API. So it seems like the VPC connector (10.1.0.16/28) cannot connect to the Cloud SQL instance (10.0.80.0/20).
Has anyone experienced this issue before?