Ausgabe
Das Problem ist, dass ich eine asynchrone Methode/Bibliothek wie folgt erstellen muss (damit sie die Asyncio-Ereignisschleife nicht blockiert):
- Future aus der zu definierenden asynchronen Methode erstellen (sagen wir es methodA)
- Setzen Sie die Zukunft in eine Warteschlange / Liste / Diktat und ein Dienst füllt das Ergebnis in die Zukunft, wenn es verfügbar ist (es dauert lange, bis das Ergebnis verfügbar ist).
- Erwarten Sie die Zukunft in der MethodeA
Das Problem, das ich habe, ist das Warten auf die geschaffene Zukunft, die für immer blockiert ist, wie das vereinfachte Beispiel unten zeigt.
import asyncio
from asyncio import Future
from queue import Queue
from threading import Thread
futures_queue: Queue[Future] = Queue()
def fill_result_service():
counter = 0
while True:
fut = futures_queue.get()
print(f"Processing fut={id(fut)}")
fut.set_result(f"OK: {counter}")
counter += 1
filler_thread = Thread(target=fill_result_service)
filler_thread.start()
async def main_not_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
print(f"Putting fut={id(fut)} into queue")
futures_queue.put(fut)
result = await fut
assert result.startswith("OK")
print("main_not_ok() completed")
async def main_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread"))
tmp_thread.start()
result = await fut
assert result.startswith("OK")
print("main_ok() completed")
if __name__ == "__main__":
print("Running main_ok: ")
asyncio.run(main_ok()) # work as expected
print("\n\n\nRunning main_not_ok: ")
asyncio.run(main_not_ok()) #blocking forever
Ich habe gekämpft, um es für einen halben Tag zu debuggen und kann es nicht herausfinden. Bitte hilf mir.
Lösung
Sie können es auf Queue
folgende Weise tun:
import asyncio
import time
from random import randint
from threading import Thread
def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop):
"""Thread target function. It gives some result to futures."""
async def _event_status_change(_event: asyncio.Event):
"""Wrap event in coroutine to run it threadsafe"""
_event.set()
time.sleep(3)
res = randint(1, 10)
if res > 8:
future.set_exception(Exception(f"Result: {res} " + "Error !!! " * 2))
else:
future.set_result(res)
asyncio.run_coroutine_threadsafe(_event_status_change(event), loop)
async def asyncio_loop_killer(task: asyncio.Task):
"""Just task to finish our app in several seconds."""
n = 20
while n:
await asyncio.sleep(1)
n -= 1
if task.done():
break
else:
task.cancel()
async def tasks_producer():
"""Main function of our app. It produces futures for children threads."""
loop = asyncio.get_event_loop()
while True:
future, event = asyncio.Future(), asyncio.Event()
event.clear()
worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True)
worker.start()
await event.wait()
if res := future.exception():
print(f"Error: {res}")
break
print(f"Result: {future.result()}")
async def async_main():
"""Wrapper around all async activity."""
producer_task = asyncio.create_task(tasks_producer())
await asyncio_loop_killer(producer_task)
if __name__ == '__main__':
asyncio.run(async_main())
Auch Überprüfung add_done_callback
vonFuture
Beantwortet von – Artiom Kozyrev
Antwort geprüft von – David Goodson (FixError Volunteer)