Rust ownership in loops

I'm trying to implement rabbitmq send/listen functionality in Rust and I have the following code:

struct RabbitMQ {
    connection: Connection,
}

impl RabbitMQ {
    fn connect() -> Self {
        RabbitMQ {
           connection: the created connection
        }
    }
}

impl MessageBroker for RabbitMQ {
    async fn publish(&self, topic: &Topic) -> Result<PublisherConfirm, Error> {
        let channel = self.connection.create_channel().await.unwrap();

        RabbitMQ::create_exchange(&channel, &topic.exchange).await;

        let payload = topic.message.as_bytes();

        let res = channel.basic_publish(
            topic.exchange.name.as_str(),
            topic.queue.routing_key.as_str(),
            topic.exchange.publish_options,
            payload.to_vec(),
            BasicProperties::default(),
         );

        res.await
    }
}

So far so good!

Now I want to publish many messages in a for loop without waiting for the confirmation from the server, the problem is that when I spawn tokio async task I need to move my broker value and this makes it invalid for the next iteration of the loop:

let broker = RabbitMQ::connect(&connection_details).await;

for x in 1..10 {
    tokio::spawn(async move {
        let confirm = broker.publish(&my_topic).await.unwrap();
    }).await.unwrap();
}

The above code won't compile with the following error:

 error[E0382]: use of moved value: `broker`
  --> src/main.rs:47:33
   |
21 |       let broker = RabbitMQ::connect(&connection_details).await;
   |           ------ move occurs because `broker` has type `message_broker::RabbitMQ`, which >does not implement the `Copy` trait
...
47 |           tokio::spawn(async move {
   |  _________________________________^
48 | |             let confirm = &broker.publish(&enable_cdn).await.unwrap();
   | |                            ------ use occurs due to use in generator
49 | |         }).await.unwrap();
   | |_________^ value moved here, in previous iteration of loop

I can't implement the Copy trait as Connection isn't primitive and it seems that I can't use reference "&" to the broker.

My question is how can I accomplish this without writing n publish calls?

1 answer

  • answered 2020-09-24 14:00 Masklinn

    You're using an async move block, which means any name which is used in the block is moved into the future, regardless of the operations being performed. So writing

    &broker.publish
    

    inside the block makes no difference: first broker is moved, and the future (when polled with .await) takes an internal reference to it. So what you need to do is borrow outside the block then move that borrow inside:

    let broker = RabbitMQ::connect(&connection_details).await;
    
    for x in 1..10 {
        let broker = &broker;
        tokio::spawn(async move {
            let confirm = broker.publish(&enable_cdn).await.unwrap();
        }).await.unwrap();
    }
    

    but I think that's not going to work either: tokio::spawn is not scoped, so even though you're await-ing it, the compiler has no idea that it will not outlive broker. As far as it's concerned a tokio task can live as long as it wants. This means you're now probably going to get a lifetime error (the compiler will assume the borrow can outlive the enclosing function, and thus its origin).

    An easy solution to that would be to put the Connection behind an Arc or something.

    Alternatively, restructure your system to work better with the requirements of rabbitmq: no idea which you're using but amiquip states that connections are thread-safe, and channels while not thread-safe can be sent to other threads.

    So rather than publish-ing to an implicit connection, in each iteration of the loop create a channel and move that into the task in order to actually perform the publication.

    Also,

    Now I want to publish many messages in a for loop without waiting for the confirmation from the server

    aren't you still doing that since you're awaiting the result of tokio::spawn?