[FIXED] Erstellen Sie asyncio Future und füllen Sie das Ergebnis in einem anderen Thread über die Warteschlange aus, die nicht funktioniert

Ausgabe

Das Problem ist, dass ich eine asynchrone Methode/Bibliothek wie folgt erstellen muss (damit sie die Asyncio-Ereignisschleife nicht blockiert):

  1. Future aus der zu definierenden asynchronen Methode erstellen (sagen wir es methodA)
  2. Setzen Sie die Zukunft in eine Warteschlange / Liste / Diktat und ein Dienst füllt das Ergebnis in die Zukunft, wenn es verfügbar ist (es dauert lange, bis das Ergebnis verfügbar ist).
  3. Erwarten Sie die Zukunft in der MethodeA

Das Problem, das ich habe, ist das Warten auf die geschaffene Zukunft, die für immer blockiert ist, wie das vereinfachte Beispiel unten zeigt.

import asyncio
from asyncio import Future
from queue import Queue
from threading import Thread

futures_queue: Queue[Future] = Queue()


def fill_result_service():
    counter = 0
    while True:
        fut = futures_queue.get()
        print(f"Processing fut={id(fut)}")
        fut.set_result(f"OK: {counter}")
        counter += 1


filler_thread = Thread(target=fill_result_service)
filler_thread.start()


async def main_not_ok():
    fut: Future[str] = asyncio.get_running_loop().create_future()
    print(f"Putting fut={id(fut)} into queue")
    futures_queue.put(fut)
    result = await fut
    assert result.startswith("OK")
    print("main_not_ok() completed")


async def main_ok():
    fut: Future[str] = asyncio.get_running_loop().create_future()
    tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread"))
    tmp_thread.start()
    result = await fut
    assert result.startswith("OK")
    print("main_ok() completed")
    


if __name__ == "__main__":
    print("Running main_ok: ")
    asyncio.run(main_ok()) # work as expected
    print("\n\n\nRunning main_not_ok: ")
    asyncio.run(main_not_ok()) #blocking forever

Ich habe gekämpft, um es für einen halben Tag zu debuggen und kann es nicht herausfinden. Bitte hilf mir.

Lösung

Sie können es auf Queuefolgende Weise tun:

import asyncio
import time
from random import randint
from threading import Thread


def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop):
    """Thread target function. It gives some result to futures."""
    async def _event_status_change(_event: asyncio.Event):
        """Wrap event in coroutine to run it threadsafe"""
        _event.set()

    time.sleep(3)
    res = randint(1, 10)
    if res > 8:
        future.set_exception(Exception(f"Result: {res} " + "Error  !!! " * 2))
    else:
        future.set_result(res)
    asyncio.run_coroutine_threadsafe(_event_status_change(event), loop)


async def asyncio_loop_killer(task: asyncio.Task):
    """Just task to finish our app in several seconds."""
    n = 20
    while n:
        await asyncio.sleep(1)
        n -= 1
        if task.done():
            break
    else:
        task.cancel()


async def tasks_producer():
    """Main function of our app. It produces futures for children threads."""
    loop = asyncio.get_event_loop()
    while True:
        future, event = asyncio.Future(), asyncio.Event()
        event.clear()
        worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True)
        worker.start()
        await event.wait()
        if res := future.exception():
            print(f"Error: {res}")
            break

        print(f"Result: {future.result()}")


async def async_main():
    """Wrapper around all async activity."""
    producer_task = asyncio.create_task(tasks_producer())
    await asyncio_loop_killer(producer_task)

if __name__ == '__main__':
    asyncio.run(async_main())

Auch Überprüfung add_done_callbackvonFuture


Beantwortet von –
Artiom Kozyrev


Antwort geprüft von –
David Goodson (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like