Getting lambda response when calling lambda through SQS
I have the following pipeline:
Script --> SQS --> Lambda
- A script sends a message to a SQS queue.
- Based on the content of this message, the Lambda executes a different process (Calls an API with some payload)
- The script needs to receive the API HTTP response, output of Lambda.
How can I achieve this last step? (Ideally using boto3)
1 answer
-
answered 2022-01-18 12:42
luk2302
You can't / don't. If you want a response from the lambda invoke the lambda directly and synchronously. Putting a queue between you and the lambda explicitly decouples your script from the lambda.
You could include some path / identifier in the message which tells the lambda where to put the response, e.g. under what dynamodb entry or into which S3 key. And then your script would need to poll that target location for an update from the lambda. But that to me sounds overly complicated and I am not sure what you gain over just calling the lambda directly.
do you know?
how many words do you know
See also questions close to this topic
-
Upload file from html when block public access is true
I am using
django-s3direct
to file uploadhttps://github.com/bradleyg/django-s3direct
Using IAM role setting because I upload the file from the server on ECS container.
Now I set the
blockPublicAccess
ofS3
false.When uploading images from html, there comes error.
https://s3.ap-northeast-1.amazonaws.com/static-resource-v/images/c64d6e593de44aa5b10dcf1766582547/_origin.jpg?uploads (403 (Forbidden) ) initiate error: static-resource-v/line-assets/images/c64d6e593de44aa5b10dcf1766582547/_origin.jpg AWS Code: AccessDenied, Message:Access Deniedstatus:403
OK, it is understandable.
Browser try to access the for initiation.
However there is any way to upload file from browser when blockPublicAccess is true??
-
Linux on Lightsail instance is asking for a password and it's not working
I'm trying to restart
mariaDB
on Ubuntu but it's not letting me.I enter:
systemctl restart mariadb
and get:
==== AUTHENTICATING FOR org.freedesktop.systemd1.manage-units === Authentication is required to restart 'mariadb.service'. Authenticating as: Ubuntu (ubuntu) Password: polkit-agent-helper-1: pam_authenticate failed: Authentication failure ==== AUTHENTICATION FAILED ===
I have the same password for all functions so I do not understand why it is not working. What can I do?
-
AWS Pinpoint sendMessages() Addresses param field error
I'm having trouble replicating the format of the params object's Addresses format in a way where I can easily add to the object.
If I use this as the params with
destinationNumber[0]
anddestinationNumber[1]
in the format of 1 + 9 digit number ie13334535667
then it sends the message to both numbers no problem.const params = { ApplicationId: applicationId, MessageRequest: { Addresses: { [destinationNumber[0]]: { ChannelType: 'SMS' }, [destinationNumber[1]]: { ChannelType: 'SMS' } }, MessageConfiguration: { SMSMessage: { Body: message, Keyword: registeredKeyword, MessageType: messageType, OriginationNumber: originationNumber } } } };
I'm trying to replicate this format for
Addresses
, but I'm gettingUnexpected key '13334535667' found in params.MessageRequest.Addresses['0']
. The format my console output shows for Addresses is[ { '12345678910': { ChannelType: 'SMS' } }, { '12345678911': { ChannelType: 'SMS' } } ]
I'm using a map to call this
function createPhoneMessagingObject(phoneNumber: string) { return { [phoneNumber]: { ChannelType: 'SMS' } }; }
I tried wrapping key in array like in phone object, but per the output, the brackets goes away so maybe there's an easier/more correct way of doing this. I appreciate any help!
-
AWS Lambda Function Direct Post Payload Always Null
I'm trying to use Lambda Functions (C#) with the Function URL for direct access. In postman I'm sending a basic json body that matches the class properties in my input parameter (PostBody). When I execute the POST request, the values are always null tho. Is the input supposed to be something else besides the expected class?
public string FunctionHandler(PostBody input, ILambdaContext context) { LambdaLogger.Log(JsonSerializer.Serialize(input)); return "Reached Here"; }
-
Is there a way to filter with AWS SNS filter policies from the message body to SQS queues?
I have a general question about SNS filter policies. I know that the filter policies filter based on the Message Attributes. If we wanted to filter based on the body, is there a work around to do so?
The SNS topic will be delivering different types of data to SQS queues based on the filter policies.
-
How to import existing lambda from arn and add SQS as event source? Amplify
I'm trying to create an SNS topic that an SQS queue subscribes to which acts as an event source for a Lambda function. I'm trying to do this with the amplify cdk integration. However, there seems to be some problem when trying to reference the function which results in a permission problem.
CREATE_FAILED fetchMetadataSqsEventSourcesqsqueue2144E8FE AWS::Lambda::EventSourceMapping Fri May 06 2022 17:20:15 GMT+0200 (Central European Summer Time) Resource handler returned message: "Invalid request provided: The provided execution role does not have permissions to call ReceiveMessage on SQS (Service: Lambda, Status Code: 400, Request ID: 2b3147b0-8f59-4c35-8f0f-b7c29a45f139, Extended Request ID: null)" (RequestToken: c03cf5fb-283b-6d83-93c0-f7ee018338cd, HandlerErrorCode: InvalidRequest)
Here's my code
import * as AmplifyHelpers from "@aws-amplify/cli-extensibility-helper" import * as iam from "@aws-cdk/aws-iam" import * as lambda from "@aws-cdk/aws-lambda" import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources" import * as sns from "@aws-cdk/aws-sns" import * as subs from "@aws-cdk/aws-sns-subscriptions" import * as sqs from "@aws-cdk/aws-sqs" import * as cdk from "@aws-cdk/core" import { Duration } from "@aws-cdk/core" import { AmplifyDependentResourcesAttributes } from "../../types/amplify-dependent-resources-ref" export class cdkStack extends cdk.Stack { constructor( scope: cdk.Construct, id: string, props?: cdk.StackProps, amplifyResourceProps?: AmplifyHelpers.AmplifyResourceProps ) { super(scope, id, props) /* Do not remove - Amplify CLI automatically injects the current deployment environment in this input parameter */ new cdk.CfnParameter(this, "env", { type: "String", description: "Current Amplify CLI env name", }) /* AWS CDK code goes here - learn more: https://docs.aws.amazon.com/cdk/latest/guide/home.html */ // Example 1: Set up an SQS queue with an SNS topic const amplifyProjectInfo = AmplifyHelpers.getProjectInfo() const sqsQueueResourceNamePrefix = `sqs-queue-${amplifyProjectInfo.projectName}` const queue = new sqs.Queue(this, "sqs-queue", { queueName: `${sqsQueueResourceNamePrefix}-${cdk.Fn.ref("env")}`, visibilityTimeout: Duration.seconds(30), // default, receiveMessageWaitTime: Duration.seconds(20), // default }) // 👇create sns topic const snsTopicResourceNamePrefix = `sns-topic-${amplifyProjectInfo.projectName}` const topic = new sns.Topic(this, "sns-topic", { topicName: `${snsTopicResourceNamePrefix}-${cdk.Fn.ref("env")}`, }) // 👇 subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)) new cdk.CfnOutput(this, "snsTopicArn", { value: topic.topicArn, description: "The arn of the SNS topic", }) const dependencies: AmplifyDependentResourcesAttributes = AmplifyHelpers.addResourceDependency( this, amplifyResourceProps.category, amplifyResourceProps.resourceName, [ { category: "function", // api, auth, storage, function, etc. resourceName: "fetchMetadata", // find the resource at "amplify/backend/<category>/<resourceName>" } /* add more dependencies as needed */, ] ) const fetchMetadataFnArn = cdk.Fn.ref( dependencies.function.fetchMetadata.Arn ) const lambdaRole = new iam.Role(this, "Role", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), description: "Example role...", }) queue.grantConsumeMessages(lambdaRole) let fn = lambda.Function.fromFunctionAttributes(this, "fetchMetadata", { role: lambdaRole, functionArn: fetchMetadataFnArn, }) queue.grantConsumeMessages(fn) const eventSource = new SqsEventSource(queue) fn.addEventSource(eventSource) } }
Here's a snippet of the generated CloudFormation code, it seems like there might be an issue with the arn?
-
Send SVG image content to email as HTML using boto3
I want to send SVG content (without saved file) as HTML to Outlook email address. This SVG content works fine in browser showing a circle image, however sending it via boto3.client to Outlook email results in empty mail. Why? Any suggestions appreciated.
import io import json import boto3 from botocore.exceptions import ClientError SENDER = "Name1 LastName1 <Name1.LastName1@mail.com>" RECIPIENT = "Name2 LastName2 <Name2.LastName2@mail.com>" AWS_REGION = "us-east-1" SUBJECT = "something" svg = """ <svg width="100" height="100"> <circle cx="50" cy="50" r="40" stroke="green" stroke-width="4" fill="yellow" /> </svg> """ BODY_HTML = f"""<html> <body> {svg} </body> </html> """ CHARSET = "UTF-8" client = boto3.client('ses',region_name=AWS_REGION) try: response = client.send_email( Destination={ 'ToAddresses': [ RECIPIENT ], }, Message={ 'Body': { 'Html': { 'Charset': CHARSET, 'Data': BODY_HTML } }, 'Subject': { 'Charset': CHARSET, 'Data': SUBJECT }, }, Source=SENDER ) except ClientError as e: print(e.response['Error']['Message']) else: print("Email sent! Message ID:"), print(response['MessageId'])
-
Load and cache images from AWS S3 into flutter
I want to fetch and cache user profile pictures from S3 into my flutter app.
First, when a user uploads a picture, my flask backend generates a random file name, stores the file in an S3 bucket (using boto3) and the name in the database.
To retrieve the picture I use presigned_urls:
s3client = boto3.client('s3', config=Config(signature_version='s3v4', region_name='eu-west-2')) s3client.generate_presigned_url('get_object',Params={'Bucket': BUCKET,'Key': file_name_retrieved_from_db_for_user},ExpiresIn=120)
In Flutter I have a Future which calls the API and gets the image's generated presigned url (i.e. https://xx.s3.amazonaws.com/FILENAME.jpg?signature).And then using a FutureBuilder I do the following:
FutureBuilder( future: get_picture_url(user_id), builder: (context, snapshot) { if (snapshot.connectionState == ConnectionState.done) { if (snapshot.data==0) { return Icon(Icons.account_circle, size: 110.0); } print(user_id); print('this is the data fetched'); print(snapshot.data); return CachedNetworkImage( imageUrl: snapshot.data, imageBuilder: (context, imageProvider) => Container( width: 180.0, height: 180.0, decoration: BoxDecoration( shape: BoxShape.circle, image: DecorationImage( image: imageProvider, fit: BoxFit.cover), ), ), placeholder: (context, url) => ProfPicPlaceHolder(), errorWidget: (context, url, error) => Icon(Icons.error), ); } else { return ProfPicPlaceHolder(); } } ),
The problem is that each time the FutureBuilder calls the API to get the image's url, the URL is different due to different signature following the filename in the url, so the same image is loaded and cached again and again.
How can I access an image that is stored in S3 in flask using boto3 and then pass that url to cached network image in flutter? Is there any other way to cache an image in flutter from aws S3?
-
How to download files from S3 to a custom folder or a network path using boto3
Below is the function to download the files from a S3 Bucket. But the problem is I can't find how to direct those files into a network path instead of downloading into the project folder without having any control over where the files must be downloaded.
import boto3 import config import os import win32api def download_all_objects_in_folder(): #= boto3.resource('s3') s3_resource = boto3.resource('s3', aws_access_key_id=config.AWS_BUCKET_KEY, aws_secret_access_key=config.AWS_BUCKET_SECRET_KEY) my_bucket = s3_resource.Bucket(config.BUCKET) # Create the folder logic here objects = my_bucket.objects.filter(Prefix='Export_20181104/') for obj in objects: path, filename = os.path.split(obj.key) my_bucket.download_file(obj.key, filename,"C:\Other") #win32api.MessageBox(0, obj.key, 'title') print("imports completed")
Update: This is the error I am getting when I pass the custom path.
ValueError: Invalid extra_args key 'C', must be one of: ChecksumMode, VersionId, SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, RequestPayer, ExpectedBucketOwner
-
S3 : Put Vs Multipart Notification clarification
Wanted to confirm the behavior I am seeing. I am using Amplify.Storage.uploadFile to upload a file. The file being uploaded can be of any size . Seems like the amplify sdk decides the mechanism for upload depending upon file size. I am listening to SQS notifications on upload. This is the behavior that I see
Enable only Multipart upload complete notification:
- Smaller file size : receive nothing
- Larger file size: receive single ObjectCreated:CompleteMultipartUpload event
Problem: I miss out on the smaller file.
Enable both PUT and Multipart upload complete notification
- Smaller file size: receive put event
- Larger file size : get multiple ObjectCreated:CompleteMultipartUpload events
Problem: I don’t know which of the notifications to listen to for the larger file size. I don’t know if anything is guaranteed about the timing of the multiple notification. Can I assume to try to read the file and if the multipart upload has not truly finished then I wouldn’t be able to download the file and hence I can ignore the notification ?
Thoughts ?
-
Lambda trigger again after timeout even sqs visible timeout is same as lambda timeout
Lambda has triggered again even after the SQS visibility timeout is the same as lambda timeout around 8min.
Having SQS visibility timeout greater than lambda timeout, inflight messages in sqs were being picked up again in lambdas.
is there a workaround?
we dont have DLQ.
-
How to configure a consumer with the @ssut/nestjs-sqs library?
I'm trying to consume messages from an Sqs queue but I'm having a hard time configuring it, since the documentation lack of examples. I've created an sqs-queue-module.ts:
@Module({ imports:[ SqsModule.register({ consumers:[ { name: "sqs.myqueuename" } ] }) ], providers: [SqsQueueService], exports: [SqsModule] })
My SqsQueueService only have a function that handle the arriving message as the documentation show
@Injectable() export class SqsQueueService{ @SqsMessageHandler("sqs.myqueuename", false) public async handleMessage(message: AWS.SQS.Message) { console.log(message); } }
Finally, I import my sqs-queue-module.ts into the app.module.ts. On Aws, I have a message available on the queue to be consumed but I don't know how to make this message travel into my application. On 'normal' post/get request, I create the endpoint through my app.controller.ts but since that's a sqs-queue, maybe it's different.
My doubts are:- Should the app consume it automatically or should I trigger something?
- Maybe the queue name is wrong, I'm using the URL and not the ARN (I know we use the ARN on lambda application)
- Have I configured it properly?
-
What is the behaviour of SQS FIFO queue after visibility timeout expires?
- After visibility timeout expires, will the message go to same consumer or different consumer in FIFO queue?
- To ensure message ordering, message has to be added at the head of the queue in case of visibility timeout expiry. Is this assumption correct ? If not how message ordering is preserved when visibility timeout expires ? Please point me to any official documentation. I tried a lot in official docs to get above information but no luck.
-
Cascading after_update operations with sqlalchemy not working
We have a parent/child table and want to be able to propagate updates up from the children/leaves to the parent root.
For example, given we have a structure like
{ id: 1, state: "in-progress", children: [{ id: 2, state: "in-progress", children: [{ id: 3, state: "in-progress" }] }] }
When
child 3
state gets updated tocompleted
we want to update the state of the parent2
and its parent1
tocompleted
as well. We can do that when it's one level of separation, but for two levels the root node never gets updated. So node2
will get updated tocompleted
but1
is stuckin-progress
Here's what we have
class OperationNode(BaseModel): __tablename__ = "operation_node" id = Column(Integer, primary_key=True) # child -> parent parent_id = Column( Integer, ForeignKey("operation_node.id", ondelete="CASCADE", onupdate="CASCADE"), index=True, ) # Track progress state = Column(String(15), default=OperationNodeStates.in_progress.value) # List of child objects with eager loading and max depth children = relationship("OperationNode", foreign_keys=[parent_id], cascade="all, delete-orphan", lazy="joined", join_depth=2) @event.listens_for(OperationNode, "after_update") def after_update_trigger_parent_sync(mapper, connection, operation_node): insp = db.inspect(operation_node) state_history = insp.attrs["state"].load_history() # Update aggregated fields of parent when a child has changes if operation_node.parent_id \ and state_history.has_changes(): operation_node_table = OperationNode.__table__ connection.execute( operation_node_table.update(). where(operation_node_table.c.id == operation_node.parent_id). values(state="completed") )
so when we do
operation_node = OperationNode.query.get(3) operation_node.state = "completed" db.session.merge(operation_node) db.session.commit()
only the direct parent
2
gets updated from theafter_update
and not the root1
. Theafter_update
doesn't trigger the second time :(. Would be happy to share more code/context if needed.Are we missing something? Or is there a good example of this same use case out there? Any help is appreciated as we are very stuck!
Using sqlalchemy
1.3.24
andFlask-SQLAlchemy
2.4.4