Running Functions with Multiple Arguments Concurrently and Aggregating Complex Results

Set Up

This is part two of a question that I posted regarding accessing results from multiple processes.
For part one click Here: Link to Part One

I have a complex set of data that I need to compare to various sets of constraints concurrently, but I'm running into multiple issues. The first issue is getting results out of my multiple processes, and the second issue is making anything beyond an extremely simple function to run concurrently.

Example

I have multiple sets of constraints that I need to compare against some data and I would like to do this concurrently because I have a lot of sets of constrains. In this example I'll just be using two sets of constraints.

Jupyter Notebook

Create Some Sample Constraints & Data

# Create a set of constraints
constraints = pd.DataFrame([['2x2x2', 2,2,2],['5x5x5',5,5,5],['7x7x7',7,7,7]],
                     columns=['Name','First', 'Second', 'Third'])
constraints.set_index('Name', inplace=True)

# Create a second set of constraints
constraints2 = pd.DataFrame([['4x4x4', 4,4,4],['6x6x6',6,6,6],['7x7x7',7,7,7]],
                      columns=['Name','First', 'Second', 'Third'])
constraints2.set_index('Name', inplace=True)

# Create some sample data
items = pd.DataFrame([['a', 2,8,2],['b',5,3,5],['c',7,4,7]], columns=['Name','First', 'Second', 'Third'])
items.set_index('Name', inplace=True)

Running Sequentially

If I run this sequentially I can get my desired results but with the data that I am actually dealing with it can take over 12 hours. Here is what it would look like ran sequentially so that you know what my desired result would look like.

# Function
def seq_check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res
constraint_sets = [constraints, constraints2, ...]
results = {}
counter = 0

for df in constrain_sets:
    res = seq_check_constraint(df, items)
    results['constraints'+str(counter)] = res

or uglier:

df_res1 = seq_check_constraint(constraints, items)
df_res2 = seq_check_constraint(constraints2, items)

results = {'constraints0':df_res1, 'constraints1': df_res2}

As a result of running these sequentially I end up with DataFrame's like shown here: here

I'd ultimately like to end up with a dictionary or list of the DataFrame's, or be able to append the DataFrame's all together. The order that I get the results doesn't matter to me, I just want to have them all together and need to be able to do further analysis on them.

What I've Tried

So this brings me to my attempts at multiprocessing, From what I understand you can either use Queues or Managers to handle shared data and memory, but I haven't been able to get either to work. I also am struggling to get my function which takes two arguments to execute within the Pool's at all.

Here is my code as it stands right now using the same sample data from above:

Function

def check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)  # Mathematical Product
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res

Jupyter Notebook

df_manager = mp.Manager()
df_ns = df_manager.Namespace()
df_ns.constraint_sets = [constraints, constraints2]


print('---Starting pool---')

if __name__ == '__main__':
    with mp.Pool() as p:
        print('--In the pool--')
        res = p.map_async(mpf.check_constraint, (df_ns.constraint_sets, itertools.repeat(items)))
        print(res.get())

and my current error:

TypeError: check_constraint() missing 1 required positional argument: 'df_items_input'

1 answer

  • answered 2021-04-14 03:46 Anmol Singh Jaggi

    Easiest way is to create a list of tuples (where one tuple represents one set of arguments to the function) and pass it to starmap.

    df_manager = mp.Manager()
    df_ns = df_manager.Namespace()
    df_ns.constraint_sets = [constraints, constraints2]
    
    
    print('---Starting pool---')
    
    if __name__ == '__main__':
        with mp.Pool() as p:
            print('--In the pool--')
            check_constraint_args = []
            for constraint in constraint_sets:
                check_constraint_args.append((constraint, items))
            res = p.starmap(mpf.check_constraint, check_constraint_args)
            print(res.get())