AWS SQS FIFO queue behaviour with AWS Lambda reserved concurrency
So I have 10 message groups in FIFO queue having 2 messages per group and I have also reserved lambda concurrency set to 5. (Lambda completes execution in 1 min and SQS visibility timeout set to 2 mins)
when all 20 messages are pushed to queue, SQS inflight messages gets set to 10 and then after the execution time, 5 messages gets processed successfully and other 5 moves to DLQ.
And then the next executions inflight messages gets set to 5 (as reserved lambda concurrency set to 5.) and processes as expected (This should be the expected behaviour right?)
Any particular reason why this is happening?
do you know?
how many words do you know
See also questions close to this topic
-
Upload file from html when block public access is true
I am using
django-s3direct
to file uploadhttps://github.com/bradleyg/django-s3direct
Using IAM role setting because I upload the file from the server on ECS container.
Now I set the
blockPublicAccess
ofS3
false.When uploading images from html, there comes error.
https://s3.ap-northeast-1.amazonaws.com/static-resource-v/images/c64d6e593de44aa5b10dcf1766582547/_origin.jpg?uploads (403 (Forbidden) ) initiate error: static-resource-v/line-assets/images/c64d6e593de44aa5b10dcf1766582547/_origin.jpg AWS Code: AccessDenied, Message:Access Deniedstatus:403
OK, it is understandable.
Browser try to access the for initiation.
However there is any way to upload file from browser when blockPublicAccess is true??
-
Linux on Lightsail instance is asking for a password and it's not working
I'm trying to restart
mariaDB
on Ubuntu but it's not letting me.I enter:
systemctl restart mariadb
and get:
==== AUTHENTICATING FOR org.freedesktop.systemd1.manage-units === Authentication is required to restart 'mariadb.service'. Authenticating as: Ubuntu (ubuntu) Password: polkit-agent-helper-1: pam_authenticate failed: Authentication failure ==== AUTHENTICATION FAILED ===
I have the same password for all functions so I do not understand why it is not working. What can I do?
-
AWS Pinpoint sendMessages() Addresses param field error
I'm having trouble replicating the format of the params object's Addresses format in a way where I can easily add to the object.
If I use this as the params with
destinationNumber[0]
anddestinationNumber[1]
in the format of 1 + 9 digit number ie13334535667
then it sends the message to both numbers no problem.const params = { ApplicationId: applicationId, MessageRequest: { Addresses: { [destinationNumber[0]]: { ChannelType: 'SMS' }, [destinationNumber[1]]: { ChannelType: 'SMS' } }, MessageConfiguration: { SMSMessage: { Body: message, Keyword: registeredKeyword, MessageType: messageType, OriginationNumber: originationNumber } } } };
I'm trying to replicate this format for
Addresses
, but I'm gettingUnexpected key '13334535667' found in params.MessageRequest.Addresses['0']
. The format my console output shows for Addresses is[ { '12345678910': { ChannelType: 'SMS' } }, { '12345678911': { ChannelType: 'SMS' } } ]
I'm using a map to call this
function createPhoneMessagingObject(phoneNumber: string) { return { [phoneNumber]: { ChannelType: 'SMS' } }; }
I tried wrapping key in array like in phone object, but per the output, the brackets goes away so maybe there's an easier/more correct way of doing this. I appreciate any help!
-
AWS Lambda Function Direct Post Payload Always Null
I'm trying to use Lambda Functions (C#) with the Function URL for direct access. In postman I'm sending a basic json body that matches the class properties in my input parameter (PostBody). When I execute the POST request, the values are always null tho. Is the input supposed to be something else besides the expected class?
public string FunctionHandler(PostBody input, ILambdaContext context) { LambdaLogger.Log(JsonSerializer.Serialize(input)); return "Reached Here"; }
-
Is there a way to filter with AWS SNS filter policies from the message body to SQS queues?
I have a general question about SNS filter policies. I know that the filter policies filter based on the Message Attributes. If we wanted to filter based on the body, is there a work around to do so?
The SNS topic will be delivering different types of data to SQS queues based on the filter policies.
-
How to import existing lambda from arn and add SQS as event source? Amplify
I'm trying to create an SNS topic that an SQS queue subscribes to which acts as an event source for a Lambda function. I'm trying to do this with the amplify cdk integration. However, there seems to be some problem when trying to reference the function which results in a permission problem.
CREATE_FAILED fetchMetadataSqsEventSourcesqsqueue2144E8FE AWS::Lambda::EventSourceMapping Fri May 06 2022 17:20:15 GMT+0200 (Central European Summer Time) Resource handler returned message: "Invalid request provided: The provided execution role does not have permissions to call ReceiveMessage on SQS (Service: Lambda, Status Code: 400, Request ID: 2b3147b0-8f59-4c35-8f0f-b7c29a45f139, Extended Request ID: null)" (RequestToken: c03cf5fb-283b-6d83-93c0-f7ee018338cd, HandlerErrorCode: InvalidRequest)
Here's my code
import * as AmplifyHelpers from "@aws-amplify/cli-extensibility-helper" import * as iam from "@aws-cdk/aws-iam" import * as lambda from "@aws-cdk/aws-lambda" import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources" import * as sns from "@aws-cdk/aws-sns" import * as subs from "@aws-cdk/aws-sns-subscriptions" import * as sqs from "@aws-cdk/aws-sqs" import * as cdk from "@aws-cdk/core" import { Duration } from "@aws-cdk/core" import { AmplifyDependentResourcesAttributes } from "../../types/amplify-dependent-resources-ref" export class cdkStack extends cdk.Stack { constructor( scope: cdk.Construct, id: string, props?: cdk.StackProps, amplifyResourceProps?: AmplifyHelpers.AmplifyResourceProps ) { super(scope, id, props) /* Do not remove - Amplify CLI automatically injects the current deployment environment in this input parameter */ new cdk.CfnParameter(this, "env", { type: "String", description: "Current Amplify CLI env name", }) /* AWS CDK code goes here - learn more: https://docs.aws.amazon.com/cdk/latest/guide/home.html */ // Example 1: Set up an SQS queue with an SNS topic const amplifyProjectInfo = AmplifyHelpers.getProjectInfo() const sqsQueueResourceNamePrefix = `sqs-queue-${amplifyProjectInfo.projectName}` const queue = new sqs.Queue(this, "sqs-queue", { queueName: `${sqsQueueResourceNamePrefix}-${cdk.Fn.ref("env")}`, visibilityTimeout: Duration.seconds(30), // default, receiveMessageWaitTime: Duration.seconds(20), // default }) // 👇create sns topic const snsTopicResourceNamePrefix = `sns-topic-${amplifyProjectInfo.projectName}` const topic = new sns.Topic(this, "sns-topic", { topicName: `${snsTopicResourceNamePrefix}-${cdk.Fn.ref("env")}`, }) // 👇 subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)) new cdk.CfnOutput(this, "snsTopicArn", { value: topic.topicArn, description: "The arn of the SNS topic", }) const dependencies: AmplifyDependentResourcesAttributes = AmplifyHelpers.addResourceDependency( this, amplifyResourceProps.category, amplifyResourceProps.resourceName, [ { category: "function", // api, auth, storage, function, etc. resourceName: "fetchMetadata", // find the resource at "amplify/backend/<category>/<resourceName>" } /* add more dependencies as needed */, ] ) const fetchMetadataFnArn = cdk.Fn.ref( dependencies.function.fetchMetadata.Arn ) const lambdaRole = new iam.Role(this, "Role", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), description: "Example role...", }) queue.grantConsumeMessages(lambdaRole) let fn = lambda.Function.fromFunctionAttributes(this, "fetchMetadata", { role: lambdaRole, functionArn: fetchMetadataFnArn, }) queue.grantConsumeMessages(fn) const eventSource = new SqsEventSource(queue) fn.addEventSource(eventSource) } }
Here's a snippet of the generated CloudFormation code, it seems like there might be an issue with the arn?
-
How to configure a consumer with the @ssut/nestjs-sqs library?
I'm trying to consume messages from an Sqs queue but I'm having a hard time configuring it, since the documentation lack of examples. I've created an sqs-queue-module.ts:
@Module({ imports:[ SqsModule.register({ consumers:[ { name: "sqs.myqueuename" } ] }) ], providers: [SqsQueueService], exports: [SqsModule] })
My SqsQueueService only have a function that handle the arriving message as the documentation show
@Injectable() export class SqsQueueService{ @SqsMessageHandler("sqs.myqueuename", false) public async handleMessage(message: AWS.SQS.Message) { console.log(message); } }
Finally, I import my sqs-queue-module.ts into the app.module.ts. On Aws, I have a message available on the queue to be consumed but I don't know how to make this message travel into my application. On 'normal' post/get request, I create the endpoint through my app.controller.ts but since that's a sqs-queue, maybe it's different.
My doubts are:- Should the app consume it automatically or should I trigger something?
- Maybe the queue name is wrong, I'm using the URL and not the ARN (I know we use the ARN on lambda application)
- Have I configured it properly?
-
What is the behaviour of SQS FIFO queue after visibility timeout expires?
- After visibility timeout expires, will the message go to same consumer or different consumer in FIFO queue?
- To ensure message ordering, message has to be added at the head of the queue in case of visibility timeout expiry. Is this assumption correct ? If not how message ordering is preserved when visibility timeout expires ? Please point me to any official documentation. I tried a lot in official docs to get above information but no luck.
-
Cascading after_update operations with sqlalchemy not working
We have a parent/child table and want to be able to propagate updates up from the children/leaves to the parent root.
For example, given we have a structure like
{ id: 1, state: "in-progress", children: [{ id: 2, state: "in-progress", children: [{ id: 3, state: "in-progress" }] }] }
When
child 3
state gets updated tocompleted
we want to update the state of the parent2
and its parent1
tocompleted
as well. We can do that when it's one level of separation, but for two levels the root node never gets updated. So node2
will get updated tocompleted
but1
is stuckin-progress
Here's what we have
class OperationNode(BaseModel): __tablename__ = "operation_node" id = Column(Integer, primary_key=True) # child -> parent parent_id = Column( Integer, ForeignKey("operation_node.id", ondelete="CASCADE", onupdate="CASCADE"), index=True, ) # Track progress state = Column(String(15), default=OperationNodeStates.in_progress.value) # List of child objects with eager loading and max depth children = relationship("OperationNode", foreign_keys=[parent_id], cascade="all, delete-orphan", lazy="joined", join_depth=2) @event.listens_for(OperationNode, "after_update") def after_update_trigger_parent_sync(mapper, connection, operation_node): insp = db.inspect(operation_node) state_history = insp.attrs["state"].load_history() # Update aggregated fields of parent when a child has changes if operation_node.parent_id \ and state_history.has_changes(): operation_node_table = OperationNode.__table__ connection.execute( operation_node_table.update(). where(operation_node_table.c.id == operation_node.parent_id). values(state="completed") )
so when we do
operation_node = OperationNode.query.get(3) operation_node.state = "completed" db.session.merge(operation_node) db.session.commit()
only the direct parent
2
gets updated from theafter_update
and not the root1
. Theafter_update
doesn't trigger the second time :(. Would be happy to share more code/context if needed.Are we missing something? Or is there a good example of this same use case out there? Any help is appreciated as we are very stuck!
Using sqlalchemy
1.3.24
andFlask-SQLAlchemy
2.4.4