python 3.6 asyncio error not iterable while not iterating through async object

I have a class that create a url and some json to execute in a post method that looks like that and I was following this guide

import vk_api
from vk_api.execute import VkFunction
import time
from datetime import datetime
import numpy as np
import asyncio
from ratelimit import limits
import requests
import aiohttp



class Execute:  

    def __init__(self, access_token):
        self.access_token = access_token

    def posts_to_push(self, posts, limit):
        arr = []
        data = list(self.posts_chunks_limit(posts, limit))
        for i in range(len(data)):
            code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
            arr.append(code)
        return arr  

    def posts_execute_command(self, posts):  # TODO make async
        limit = 100
        code = self.posts_to_push(posts, limit)
        execute_limit = 25
        for i in range(len(code)):
            data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
            var = f'var data = []; {data} return data ;'
            yield var


    async def fetch(url, json_data, session):
        async with session.post(url, json=json_data) as response:
            return await response.read()

    async def result_posts(self, posts):
        result = []
        command = self.posts_execute_command(posts)
        async with aiohttp.ClientSession() as session:
            for i in command:
                execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
                                        json_data={
                                            "code": i,
                                            "access_token": self.access_token,
                                            "v": 5.101,
                                        }), session)
                result.append(execute)
        responses = await asyncio.gather(*result)
        print(responses)

    async def posts_chunks_limit(self, data, limit):
        """Yield successive n-sized chunks from l."""
        for i in range(0, len(data), limit):
            await asyncio.sleep(0.1)
            yield data[i:i + limit]

    def run_async(self, posts):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.result_posts(posts))

and then i run it like this

df = pd.read_csv('/some_path')
arr = []
for i in df['ids']:
    arr.append(i)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(vk.result_posts(arr))
loop.run_until_complete(future)

error message looks like this

Traceback (most recent call last):
  File "../test_python.py", line 83, in <module>
    loop.run_until_complete(future)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "../test_python.py", line 45, in result_posts
    for i in command:
  File "../test_python.py", line 29, in posts_execute_command
    code = self.posts_to_push(posts, limit)
  File "../test_python.py", line 21, in posts_to_push
    data = list(self.posts_chunks_limit(posts, limit))
TypeError: 'async_generator' object is not iterable

This is my frist time using aiohttp/asyncio, I find it quite complicated and easy to get lost, may be I can get some directions or solutions in my case ?

2 answers

  • answered 2019-07-19 07:02 user4815162342

    In this line:

            data = list(self.posts_chunks_limit(posts, limit))
    

    As post_chunks_limit is an async iterator, list doesn't know what to do with it. You need to iterate over it with async for or with an async list comprehension:

            data = [x async for x in self.posts_chunks_limit(posts, limit)]
    

    This requires, posts_to_push and posts_execute_command to be defined with async def. Also posts_execute_command must await the call to posts_to_push and result_posts needs to await the call to posts_execute_command.

  • answered 2019-07-19 14:47 nexla

    With the help of @user4815162342 and bunch of SO posts, I was able to fix my issue and my code looks like this. Issue was I was calling/awaiting a generator which would not be iterable in my result_postsmethod.

    import vk_api
    from vk_api.execute import VkFunction
    import time
    from datetime import datetime
    import numpy as np
    import asyncio
    from ratelimit import limits
    import requests
    import aiohttp
    import socket
    from concurrent.futures import ThreadPoolExecutor
    
    
    
    class Execute:  # TODO auth, parsers, limits, timeouts
    
        def __init__(self, access_token):
            self.access_token = access_token
    
        async def posts_to_push(self, posts, limit):
            arr = []
            data = [x async for x in self.posts_chunks_limit(posts, limit)]
            for i in range(len(data)):
                code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
                arr.append(code)
            return arr  # < len() = 1000, 1k lists with 100 post IDs inside for 100k total ids
    
    
        async def posts_execute_command(self, posts):  # TODO make async
            limit = 100
            code = await self.posts_to_push(posts, limit)
            execute_limit = 25
            for i in range(len(code)):
                data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
                var = f'var data = []; {data} return data ;'
                print(var, '---var---')
                yield var
    
        async def fetch(self, url, json_data, session):
    
            async with session.post(url, data=json_data) as response:
                return await response.read()
    
        @limits(calls=1, period=1)
        async def result_posts(self, posts):
            result = []
            command = [i async  for i in self.posts_execute_command(posts) ] #<note this iteration
            conn = aiohttp.TCPConnector(
            family=socket.AF_INET,
            verify_ssl=False,)
            async with aiohttp.ClientSession(connector=conn) as session:
                for i in command:
                    print('---code---', len(command)) #TODO fix command range that's the bug
                    execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
                                            json_data={
                                                "code": i,
                                                "access_token": self.access_token,
                                                "v": 5.101,
                                            }, session = session))
                    await asyncio.sleep(1)
                    result.append(execute)
            responses = await asyncio.gather(*result)
            print(responses, 'responses')
            return 'Done'
    
    
        async def posts_chunks_limit(self, data, limit):
            """Yield successive n-sized chunks from l."""
            for i in range(0, len(data), limit):
                yield data[i:i + limit]