How to assign task to specific thread in C++


I am currently writing a stream parser that parses multiple feeds that are coming in very fast. Let's assume that it is a twitter stream of accounts' tweets where there are X accounts. I am trying to make the processing as concurrent as possible, while also making sure each account's tweets are processed sequentially.

Each tweet requires some parsing that takes some time. So if I were to use a naive thread pool, I will run into a problem where some tweets assigned to quicker ending threads and a single account's tweets may be logged out of order.

This task can be approached using a producer-consumer model. Where in this case there is only one producer: the twitter feed. The consumers are where I am uncertain.

The Approach

My idea to tackle this is fairly simple: map each account to a bucket numbered between 1 and T. Where T is the number of threads available in my computer. Then process each bucket sequentially. This way all buckets can be run concurrently, and no single account's tweets will be logged out of order.

Here is a crude visualization of what that looks like with two threads and three accounts:

enter image description here

As you can see, since we have two threads, Accounts 1 and 3 map to the same thread but maintain internal consistency. Threads 1 & 2 can run concurrently with no conflicts ever arising.

This structure is also very extendable. If I have more producers with Accounts 4 & 5, for example. I can still add to threads 2 and 1, respectively, without losing internal account consistency.

What I've Done So Far/The Code

I'm not sure how to structure this programmatically. I'm fairly new to multi-threading in C++ so I'm using modified code from this blog post as a way to structure my file.

I'd take a read-through if you have the time, but basically my code is a minimal example replicating this process. There are 5 buckets. The tweet parsing is simulated by making a sleep for 1 second. I assign each task a mutex to lock it based on it's bucket. This is done using a simple mutex = mutex_map[task_id % NUM_BUCKETS].

The code is available here, although the VM is limited to 2 threads. If we scale up to 11 threads (on my machine), we run into race conditions where some threads beat the others. Essentially what happens is this:

  • The machine has 11 threads initially available.

  • It assigns Task 0, 5, and 10 to some threads in the thread pool.

  • Task 0 goes first, gets the mutex and locks up. Tasks 5 and 10 are waiting because the mutex for bucket 0 is locked

  • Once Task 0 is finished, sometimes Task 10 goes first, and sometimes Task 5 goes first.

Now the solution is to just limit the thread pool size to NUM_BUCKETS, but there is a core problem I'm trying to solve here, which is that what I want to happen in the background is not being implemented.


Anyone have any suggested ideas on how to approach this? How do I enforce consistency within a bucket? I want to basically assign each task to a specific thread based on the hash. Not sure how to do so as thread pool is what manages this for me...

How many English words
do you know?
Test your English vocabulary size, and measure
how many words do you know
Online Test
Powered by Examplum