How to read lines from file asynchronously and work with them in gather?
I'm trying to replace list of generated numbers with asynchronously reading them from file. How can I do it right?
from asyncio import get_event_loop, gather, sleep
import aiofiles
async def read_file():
async with aiofiles.open('test.txt', mode='rb') as f:
async for line in f:
yield line
async def main(k):
print(k)
await sleep(1)
if __name__ == '__main__':
count_group = 3
list_objects = list()
for i in range(1, 11):
list_objects.append(i)
loop = get_event_loop()
# TODO How to correctly replace list_objects with read_file()
list_func = [main(x) for x in list_objects]
run_groups = [list_func[i:i + count_group] for i in range(0, len(list_func), count_group)]
for rg in run_groups:
loop.run_until_complete(gather(*rg))
I tried different options but none of them work. My goal is to asynchronously read lines from file and work with them.
do you know?
how many words do you know
See also questions close to this topic
-
Python File Tagging System does not retrieve nested dictionaries in dictionary
I am building a file tagging system using Python. The idea is simple. Given a directory of files (and files within subdirectories), I want to filter them out using a filter input and tag those files with a word or a phrase.
If I got the following contents in my current directory:
data/ budget.xls world_building_budget.txt a.txt b.exe hello_world.dat world_builder.spec
and I execute the following command in the shell:
py -3 tag_tool.py -filter=world -tag="World-Building Tool"
My output will be:
These files were tagged with "World-Building Tool": data/ world_building_budget.txt hello_world.dat world_builder.spec
My current output isn't exactly like this but basically, I am converting all files and files within subdirectories into a single dictionary like this:
def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree
Right now, my dictionary looks like this:
key:''
.In the following function, I am turning the empty values
''
into empty lists (to hold my tags):def empty_str_to_list(d): for k,v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v)
When I run my entire code, this is my output:
hello_world.dat ['World-Building Tool'] world_builder.spec ['World-Building Tool']
But it does not see
data/world_building_budget.txt
. This is the full dictionary:{'data': {'world_building_budget.txt': []}, 'a.txt': [], 'hello_world.dat': [], 'b.exe': [], 'world_builder.spec': []}
This is my full code:
import os, argparse def fs_tree_to_dict(path_): file_token = '' for root, dirs, files in os.walk(path_): tree = {d: fs_tree_to_dict(os.path.join(root, d)) for d in dirs} tree.update({f: file_token for f in files}) return tree def empty_str_to_list(d): for k, v in d.items(): if v == '': d[k] = [] elif isinstance(v, dict): empty_str_to_list(v) parser = argparse.ArgumentParser(description="Just an example", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--filter", action="store", help="keyword to filter files") parser.add_argument("--tag", action="store", help="a tag phrase to attach to a file") parser.add_argument("--get_tagged", action="store", help="retrieve files matching an existing tag") args = parser.parse_args() filter = args.filter tag = args.tag get_tagged = args.get_tagged current_dir = os.getcwd() files_dict = fs_tree_to_dict(current_dir) empty_str_to_list(files_dict) for k, v in files_dict.items(): if filter in k: if v == []: v.append(tag) print(k, v) elif isinstance(v, dict): empty_str_to_list(v) if get_tagged in v: print(k, v)
-
Actaully i am working on a project and in it, it is showing no module name pip_internal plz help me for the same. I am using pycharm(conda interpreter
File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code exec(code, run_globals) File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\Scripts\pip.exe\__main__.py", line 4, in <module> File "C:\Users\pjain\AppData\Local\Programs\Python\Python310\lib\site-packages\pip\_internal\__init__.py", line 4, in <module> from pip_internal.utils import _log
I am using pycharm with conda interpreter.
-
Looping the function if the input is not string
I'm new to python (first of all) I have a homework to do a function about checking if an item exists in a dictionary or not.
inventory = {"apple" : 50, "orange" : 50, "pineapple" : 70, "strawberry" : 30} def check_item(): x = input("Enter the fruit's name: ") if not x.isalpha(): print("Error! You need to type the name of the fruit") elif x in inventory: print("Fruit found:", x) print("Inventory available:", inventory[x],"KG") else: print("Fruit not found") check_item()
I want the function to loop again only if the input written is not string. I've tried to type return Under print("Error! You need to type the name of the fruit") but didn't work. Help
-
AsyncIO coroutines with different IP's
I am building a python bot to check thousands of url json adresses and gather the information I need in these jsons.
My problem at the moment is the speed, mainly because the server that hosts this urls blocks my bot to load too many of their links too fast.
I am using asyncIO coroutines to speed up the gathering process but they all run on my IP to enter those urls.
My question is: Is there a way to manage the different coroutine's used IPs? sending the requests from different IPs would able me to send the requests faster for every coroutine.
Another question I have is: where do I find and how do I gather all the IPs I have to use for the different coroutines?
thank you
-
Running Async task with aiohttp web server using asyncio
I'm having a small web server that that i'm trying to run async functions on web request:
from aiohttp import web import asyncio async def funcB(): while True: print("running") async def funcA(req): print('start') asyncio.Task(funcB()) print('execute task completed') return web.Response(text="OK Start"); if __name__ == '__main__': app = web.Application() app.add_routes([web.get('/', funcA)]) web.run_app(app, host='127.0.0.1', port=2000)
Once running the app and triggering it by
curl http://127.0.0.1:2000/
i do get theexecute task completed
logline, but the curl response of the curl is not been received, if I comment out theasyncio.Task(funcB())
line - I do get the response of the CURL command - I must say that funcB dose run..what do I miss here?
-
Mac web requests hanging after thousands of requests
I'm facing a very strange issue and am looking for advice on how to debug it more than I am on a simple fix, since I've been unable to create a simple reproducible case.
Over the course of a few hours I'm opening 10,000-100,000 async requests to remote web domains with httpx. Specifically I'm using a context manager around each request in order to (hopefully) release the resources that I'm allocating in the case of an exception. I'll only have a few thousand open at any one time. My code at its base is doing the following:
async def run_request(url): try: async with AsyncClient() as client: response = await client.get(url, timeout=15) except Exception as e: continue
with ProcessPoolExecutor() as executor: await gather( *[ asyncio.get_event_loop().run_in_executor( executor, partial(run_request, url=url) ) for url in urls ] )
Sometimes the exception loop throws in the case of a timeout or an inaccessible host.
At some point my whole machine will hang when trying to create new connections. Chrome freezes, a locally hosted postgres instance freezes, even
lsof -i -a
freezes. Yet none actually timeout, they just spin forever. It seems as if the OS is unable to allocate new sockets in order to communicate with remote hosts, but I'm not sure if that explains the postgres or lsof behavior.Is it possible socket opens are being leaked and not released, despite the context manager? Has anyone seen something similar? What are the profiling methods to explore to determine the root cause?
-
ValueError: binary mode doesn't take an encoding argument | error in pickle
Am using this code to read a file with pickle but inside an async function
async with aiofiles.open("owners.pkl", mode="rb") as file: owner_dict = pickle.loads(await file.read())
But I get this error
owner_dict = pickle.loads(await file.read()) _pickle.UnpicklingError: invalid load key, '\xef'.
I just want a valid way to open files and read or write in them using pickle inside async-await functions
-
Redirect asyncio subprocess to file
I would like to execute some long-running process via
asyncio.create_subprocess_exec
and redirect its output to file. File should be also closed when process is finished, and I would like not to create separate thread waiting for process to finish to achieve this. I'm trying to read docs, buta file-like object representing a pipe to be connected to the subprocess’s standard input stream using connect_write_pipe()
is very enigmatic for someone from outside python duck-typing world. Is there any way to achieve this (maybe some external asyncio util lib)?