[FIXED] Wie kann man DataFrame mit FastAPI streamen, ohne die Daten in einer CSV-Datei zu speichern?

Ausgabe

Ich würde gerne wissen, wie man einen DataFrame mit FastAPI streamt, ohne den DataFrame in einer CSV-Datei auf der Festplatte speichern zu müssen. Derzeit ist es mir gelungen, Daten aus der CSV-Datei zu streamen, aber die Geschwindigkeit war im Vergleich zur Rückgabe einer FileResponse. Das /option7Folgende ist, was ich versuche zu tun.

Mein Ziel ist es, Daten vom FastAPI-Backend zu streamen, ohne den DataFrame in einer CSV-Datei zu speichern.

Vielen Dank.

from fastapi import FastAPI, Response,Query
from fastapi.responses import FileResponse,HTMLResponse,StreamingResponse
app = FastAPI()

df = pd.read_csv("data.csv")

@app.get("/option4")
def load_questions():
    return FileResponse(path="C:Downloads/data.csv", filename="data.csv")

@app.get("/option5")
def load_questions():
    def iterfile():  # 
        with open('data.csv', mode="rb") as file_like:  # 
            yield from file_like  # 

    return StreamingResponse(iterfile(), media_type="text/csv")

@app.get("/option7")
def load_questions():
    def iterfile():  # 
        #with open(df, mode="rb") as file_like:  # 
        yield from df  # 

    return StreamingResponse(iterfile(), media_type="application/json")


Lösung

Empfohlener Ansatz

Wie in dieser Antwort sowie hier und hierDataFrame erwähnt, besteht keine Notwendigkeit, zu verwenden , wenn die gesamten Daten (oder in Ihrem Fall) bereits in den Speicher geladen sind StreamingResponse. StreamingResponseist sinnvoll, wenn Sie Echtzeitdaten übertragen möchten und wenn Sie die Größe Ihrer Ausgabe nicht im Voraus kennen und nicht warten möchten, bis Sie alles gesammelt haben, um es herauszufinden, bevor Sie damit beginnen, es an den Client zu senden , sowie wenn eine Datei, die Sie zurückgeben möchten, zu groß ist, um in den Speicher zu passen – wenn Sie beispielsweise 8 GB RAM haben, können Sie keine 50 GB-Datei laden – und Sie die Datei daher lieber in laden möchten Speicher in Stücken.

Da der DataFrame in Ihrem Fall bereits in den Speicher geladen ist, sollten Sie stattdessen Responsedirekt einen benutzerdefinierten Wert zurückgeben, nachdem Sie die .to_json()Methode zum Konvertieren von DataFramein eine JSON-Zeichenfolge verwendet haben, wie in dieser Antwort beschrieben (siehe auch diesen verwandten Beitrag ). Beispiel:

from fastapi import Response

@app.get("/")
def main():
    return Response(df.to_json(orient="records"), media_type="application/json")

Wenn Sie feststellen, dass der Browser eine Weile braucht, um die Daten anzuzeigen, möchten Sie die Daten möglicherweise als Datei auf das Gerät des Benutzers herunterladen.json (was viel schneller abgeschlossen wäre), anstatt darauf zu warten, dass der Browser eine große Datenmenge anzeigt . Sie können dies tun, indem Sie den Content-DispositionHeader Responsemithilfe des attachmentParameters festlegen (siehe diese Antwort für weitere Details):

@app.get("/")
def main():
    headers = {'Content-Disposition': 'attachment; filename="data.json"'}
    return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')

Sie könnten die Daten auch als .csvDatei zurückgeben, indem Sie die .to_csv()Methode verwenden, ohne den Pfadparameter anzugeben. Da die Verwendung return df.to_csv()dazu führen würde, dass die Daten im Browser mit \r\neingeschlossenen Zeichen angezeigt werden, finden Sie es möglicherweise besser, die CSV-Daten Responsestattdessen in eine einzufügen und den Content-DispositionHeader anzugeben, sodass die Daten als .csvDatei heruntergeladen werden. Beispiel:

@app.get("/")
def main():
    headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
    return Response(df.to_csv(), headers=headers, media_type="text/csv")

NICHT empfohlener Ansatz

Um eine zu verwenden StreamingResponse, müssten Sie die Zeilen in einem DataFrame durchlaufen, jede Zeile in ein Wörterbuch und anschließend in eine JSON-Zeichenfolge konvertieren, indem Sie entweder die Standardbibliothek jsonoder andere schnellere JSON-Encoder verwenden, wie in dieser Antwort beschrieben (die JSON-Zeichenfolge wird später byteintern von FastAPI/Starlette in das Format kodiert, wie im Quellcode hier gezeigt ). Beispiel:

@app.get("/")
def main():
    def iter_df():
        for _, row in df.iterrows():
            yield json.dumps(row.to_dict()) + '\n'

    return StreamingResponse(iter_df(), media_type="application/json")

Das Durchlaufen von Pandas-Objekten ist im Allgemeinen langsam und wird nicht empfohlen . Wie in dieser Antwort beschrieben :

Iteration in Pandas is an anti-pattern and is something you should
only do when you have exhausted every other option
. You should
not
use any function with "iter" in its name for more than a few
thousand rows or you will have to get used to a lot of waiting.

Update

As @Panagiotis Kanavos noted in the comments section below, using either .to_json() or .to_csv() on the DataFrame that is already loaded into memory, would result in allocating the entire output string in memory, thus doubling the RAM usage or even worse. Hence, in the case of having such a huge amount of data that may cause your system to slow down or crash (because of running out of memory) if used either method above, you should rather use StreamingResponse, as described earlier. You may find faster alernative methods to iterrows() in this post, as well as faster JSON encoders, such as orjson and ujson, as described in this answer.

Alternatively, you could save the data to disk, then delete the DataFrame to release the memory—you can even manually trigger the garbage collection using gc.collect(), as shown in this answer; however, frequent calls to garbage collection is discouraged, as it is a costly operation and may affect performance—and return a FileResponse (assuming the data can fit into RAM; otherwise, you should use StreamingResponse, see this answer as well), and finally, have a BackgroundTask to delete the file from disk after returning the response. Example is given below.

Regardless, the solution you may choose should be based on your application’s requirements, e.g., the number of users you expect to serve simultaneously, the size of data, the response time, etc.), as well as your system’s specifications (e.g., avaialable memory for allocation). Additionally, since all calls to DataFrame‘s methods are synchronous, you should remember to define your endpoint with a normal def, so that it is run in an external threadpool; otherwise, it would block the server. Alternatively, you could use Starlette’s run_in_threadpool() from the concurrency module, which will run the to_csv() or to_json() function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked. Please have a look at this answer for more details on defgegen async def.

from fastapi import BackgroundTasks
from fastapi.responses import FileResponse 
import uuid
import os

@app.get("/")
def main(background_tasks: BackgroundTasks):
    filename = str(uuid.uuid4()) + ".csv"
    df.to_csv(filename)
    del df  # release the memory
    background_tasks.add_task(os.remove, filename) 
    return FileResponse(filename, filename="data.csv", media_type="text/csv")


Beantwortet von –
Chris


Antwort geprüft von –
Willingham (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like