How to configure Filter with specific message type?

I want to consume messages only with specific type and properties set. A sort of message content filter before any consumer instance created.

I'm trying to create a filter for specific ConsumeContext:

public class OrderFilter : IFilter<ConsumeContext<CreateOrderMessage>>
{
    public Task Send(ConsumeContext<CreateOrderMessage> context, IPipe<ConsumeContext<CreateOrderMessage>> next)
    {
        if (context.Message.IsTrustedUser)
        {
            return next.Send(context); // continue processing
        }
        return Task.CompletedTask; // stop message processing
    }

    public void Probe(ProbeContext context) { }
}

How can I register such a filter?

I've tried to register it in the endpoint but with no luck. I have

cfg.ReceiveEndpoint("OrderQueue", ep =>
{
    ep.UseFilter(new OrderFilter());
    ep.Consumer<CreateOrderConsumer>();
});

I have the following error: Cannot convert instance argument type '{MassTransit.IReceiveEndpointConfigurator,MassTransit.RabbitMqTransport.IRabbitMqReceiveEndpointConfigurator}' to 'GreenPipes.IPipeConfigurator<MassTransit.ConsumeContext<Core.CreateOrderMessage>>'

1 answer

  • answered 2020-10-29 12:31 Chris Patterson

    So, there used to be an extension method for this purpose, but I can't find it. You can add the filter prior to the consumer being created by creating a filter specification and adding it as shown below.

    var filter = new OrderFilter();
    var specification = new FilterPipeSpecification<ConsumeContext< CreateOrderMessage >>(filter);
    ep.AddPipeSpecification(specification);
    

    If you want to execute the filter after the consumer has been created (for instance, if you're using container scope to share information), you can use a scope consume filter (which is described in several answers, as well as the documentation) or you can add your filter during consumer configuration.

    ep.Consumer<CreateOrderConsumer>(cc =>
    {
        cc.Message<CreateOrderMessage>(mc => mc.UseFilter(new OrderFilter()));
    }