Ausgabe
Ich würde gerne wissen, wie man einen DataFrame mit FastAPI streamt, ohne den DataFrame in einer CSV-Datei auf der Festplatte speichern zu müssen. Derzeit ist es mir gelungen, Daten aus der CSV-Datei zu streamen, aber die Geschwindigkeit war im Vergleich zur Rückgabe einer FileResponse
. Das /option7
Folgende ist, was ich versuche zu tun.
Mein Ziel ist es, Daten vom FastAPI-Backend zu streamen, ohne den DataFrame in einer CSV-Datei zu speichern.
Vielen Dank.
from fastapi import FastAPI, Response,Query
from fastapi.responses import FileResponse,HTMLResponse,StreamingResponse
app = FastAPI()
df = pd.read_csv("data.csv")
@app.get("/option4")
def load_questions():
return FileResponse(path="C:Downloads/data.csv", filename="data.csv")
@app.get("/option5")
def load_questions():
def iterfile(): #
with open('data.csv', mode="rb") as file_like: #
yield from file_like #
return StreamingResponse(iterfile(), media_type="text/csv")
@app.get("/option7")
def load_questions():
def iterfile(): #
#with open(df, mode="rb") as file_like: #
yield from df #
return StreamingResponse(iterfile(), media_type="application/json")
Lösung
Empfohlener Ansatz
Wie in dieser Antwort sowie hier und hierDataFrame
erwähnt, besteht keine Notwendigkeit, zu verwenden , wenn die gesamten Daten (oder in Ihrem Fall) bereits in den Speicher geladen sind StreamingResponse
. StreamingResponse
ist sinnvoll, wenn Sie Echtzeitdaten übertragen möchten und wenn Sie die Größe Ihrer Ausgabe nicht im Voraus kennen und nicht warten möchten, bis Sie alles gesammelt haben, um es herauszufinden, bevor Sie damit beginnen, es an den Client zu senden , sowie wenn eine Datei, die Sie zurückgeben möchten, zu groß ist, um in den Speicher zu passen – wenn Sie beispielsweise 8 GB RAM haben, können Sie keine 50 GB-Datei laden – und Sie die Datei daher lieber in laden möchten Speicher in Stücken.
Da der DataFrame in Ihrem Fall bereits in den Speicher geladen ist, sollten Sie stattdessen Response
direkt einen benutzerdefinierten Wert zurückgeben, nachdem Sie die .to_json()
Methode zum Konvertieren von DataFrame
in eine JSON-Zeichenfolge verwendet haben, wie in dieser Antwort beschrieben (siehe auch diesen verwandten Beitrag ). Beispiel:
from fastapi import Response
@app.get("/")
def main():
return Response(df.to_json(orient="records"), media_type="application/json")
Wenn Sie feststellen, dass der Browser eine Weile braucht, um die Daten anzuzeigen, möchten Sie die Daten möglicherweise als Datei auf das Gerät des Benutzers herunterladen.json
(was viel schneller abgeschlossen wäre), anstatt darauf zu warten, dass der Browser eine große Datenmenge anzeigt . Sie können dies tun, indem Sie den Content-Disposition
Header Response
mithilfe des attachment
Parameters festlegen (siehe diese Antwort für weitere Details):
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')
Sie könnten die Daten auch als .csv
Datei zurückgeben, indem Sie die .to_csv()
Methode verwenden, ohne den Pfadparameter anzugeben. Da die Verwendung return df.to_csv()
dazu führen würde, dass die Daten im Browser mit \r\n
eingeschlossenen Zeichen angezeigt werden, finden Sie es möglicherweise besser, die CSV-Daten Response
stattdessen in eine einzufügen und den Content-Disposition
Header anzugeben, sodass die Daten als .csv
Datei heruntergeladen werden. Beispiel:
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
return Response(df.to_csv(), headers=headers, media_type="text/csv")
NICHT empfohlener Ansatz
Um eine zu verwenden StreamingResponse
, müssten Sie die Zeilen in einem DataFrame durchlaufen, jede Zeile in ein Wörterbuch und anschließend in eine JSON-Zeichenfolge konvertieren, indem Sie entweder die Standardbibliothek json
oder andere schnellere JSON-Encoder verwenden, wie in dieser Antwort beschrieben (die JSON-Zeichenfolge wird später byte
intern von FastAPI/Starlette in das Format kodiert, wie im Quellcode hier gezeigt ). Beispiel:
@app.get("/")
def main():
def iter_df():
for _, row in df.iterrows():
yield json.dumps(row.to_dict()) + '\n'
return StreamingResponse(iter_df(), media_type="application/json")
Das Durchlaufen von Pandas-Objekten ist im Allgemeinen langsam und wird nicht empfohlen . Wie in dieser Antwort beschrieben :
Iteration in Pandas is an anti-pattern and is something you should
only do when you have exhausted every other option. You should
not use any function with"iter"
in its name for more than a few
thousand rows or you will have to get used to a lot of waiting.
Update
As @Panagiotis Kanavos noted in the comments section below, using either .to_json()
or .to_csv()
on the DataFrame that is already loaded into memory, would result in allocating the entire output string in memory, thus doubling the RAM usage or even worse. Hence, in the case of having such a huge amount of data that may cause your system to slow down or crash (because of running out of memory) if used either method above, you should rather use StreamingResponse
, as described earlier. You may find faster alernative methods to iterrows()
in this post, as well as faster JSON encoders, such as orjson
and ujson
, as described in this answer.
Alternatively, you could save the data to disk, then delete the DataFrame to release the memory—you can even manually trigger the garbage collection using gc.collect()
, as shown in this answer; however, frequent calls to garbage collection is discouraged, as it is a costly operation and may affect performance—and return a FileResponse
(assuming the data can fit into RAM; otherwise, you should use StreamingResponse
, see this answer as well), and finally, have a BackgroundTask
to delete the file from disk after returning the response. Example is given below.
Regardless, the solution you may choose should be based on your application’s requirements, e.g., the number of users you expect to serve simultaneously, the size of data, the response time, etc.), as well as your system’s specifications (e.g., avaialable memory for allocation). Additionally, since all calls to DataFrame
‘s methods are synchronous, you should remember to define your endpoint with a normal def
, so that it is run in an external threadpool; otherwise, it would block the server. Alternatively, you could use Starlette’s run_in_threadpool()
from the concurrency
module, which will run the to_csv()
or to_json()
function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked. Please have a look at this answer for more details on def
gegen async def
.
from fastapi import BackgroundTasks
from fastapi.responses import FileResponse
import uuid
import os
@app.get("/")
def main(background_tasks: BackgroundTasks):
filename = str(uuid.uuid4()) + ".csv"
df.to_csv(filename)
del df # release the memory
background_tasks.add_task(os.remove, filename)
return FileResponse(filename, filename="data.csv", media_type="text/csv")
Beantwortet von – Chris
Antwort geprüft von – Willingham (FixError Volunteer)