FastAPI는 병렬 방식이 아닌 직렬 방식으로 API 호출 실행
FastAPI는 병렬 방식이 아닌 직렬 방식으로 API 호출 실행
다음 코드가 있습니다:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
동일한 브라우저 창의 다른 탭에서 localhost(예: )에서 코드를 실행하면 다음을 얻을 수 있습니다:
Hello
bye
Hello
bye
대신:
Hello
Hello
bye
bye
나는 사용에 대해 읽었지만, 여전히 진정한 병렬화를 할 수 없다. 문제가 뭐죠?
질문:
" 뭐가 문제죠? "
FastAPI 문서는 프레임워크가 (에서 상속된) 진행 중인 작업을 사용한다고 명시적으로 말합니다.
이는 그 자체로 이러한 모든 작업이 Python Interpreter GIL-lock을 수신하기 위해 경쟁한다는 것을 의미한다. 즉, MUTEX를 위협하는 글로벌 인터프리터 잠금이며, 이는 프로세스 중인 모든 양의 Python Interpreter 스레드를 다음과 같이 작동하도록 재설계한다...
세부적으로 보면 결과를 볼 수 있습니다. 두 번째(두 번째 FireFox 탭에서 수동으로 시작) http-request 도착에 대해 다른 핸들러를 생성하는 데 실제로 수면 시간보다 오래 걸리는 경우 GIL-lock 인터리브 시간-퀀타 라운드 로빈(GIL-lock 릴리스-획득-룰렛의 각 다음 라운드가 실행되기 전에 모든 대기-원-캔-작업)의 결과입니다python Interpreter 내부 작업은 더 많은 세부 정보를 보여주지 않으며, 더 많은 LoD를 보기 위해 에서 더 많은 세부 정보를 사용할 수 있다:
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
마지막으로, 중요한 것은, 스레드 기반 코드가 겪을 수 있는 모든 문제에 대해 자세히 읽어보는 것입니다... 심지어 커튼 뒤에서... 원인이 되기도 합니다...
광고 비망
GIL-lock, 스레드 기반 풀, 비동기식 데코레이터, 차단 및 이벤트 처리의 혼합 - 불확실성과 HWY2에 대한 확실한 혼합HEL ;o)
기준:
대신 normal을 사용하여 경로 작업 함수를 선언하면 서버를 차단하는 것처럼 직접 호출되지 않고 외부 스레드 풀에서 실행됩니다.
또한, 설명된 바와 같이:
데이터베이스, API, 파일 시스템 등과 통신하는 타사 라이브러리를 사용하고 있으며 현재 대부분의 데이터베이스 라이브러리에서 사용할 수 있는 기능이 없는 경우 경로 작업 함수를 정상적으로 선언하십시오.
프로그램이 다른 프로그램과 통신하고 응답을 기다릴 필요가 없는 경우 를 사용하십시오.
그냥 모르면 그냥 보통으로 해요.
: 필요한 만큼 경로 연산 기능을 혼합하고 사용자에게 가장 적합한 옵션을 사용하여 각 기능을 정의할 수 있습니다. FastAPI는 그들에게 옳은 일을 할 것이다.
어쨌든 위의 어떤 경우든 Fast API를 사용하면 매우 빠릅니다.
그러나 위의 단계를 따름으로써 성능을 최적화할 수 있습니다.
그러므로 (동기화) 루트는 스레드 풀에서 별도의 스레드에서 실행되거나, 다시 말해 서버는 요청을 처리하는 반면, 루트는 메인 (단일) 스레드에서 실행됩니다. 즉, 서버는 요청을 처리합니다. 네트워크를 통해 전송될 클라이언트의 데이터, 디스크의 파일 내용과 같은 루트 내의 (일반적으로) 작업에 대한 호출입니다k 읽기, 완료할 데이터베이스 작업 등.—한번 보다. 키워드는 기능 제어를 이벤트 루프로 다시 전달합니다. 즉, 주변 코루틴의 실행을 중단하고, 완료될 때까지 다른 것을 실행하도록 이벤트 루프에 지시합니다. 키워드는 함수 내에서만 작동합니다. 예를 들어, FastAPI/Starlette의 메소드를 사용할 때 파일 메소드를 a(함수 사용)와 s에서 실행합니다. 자세한 내용은 을 참조하십시오.
와의 비동기 코드는 협업적이며, 이는 "언제든지 코루틴이 있는 프로그램은 코루틴 중 하나만 실행되며, 이 실행 코루틴은 명시적으로 중단을 요청할 때만 실행을 중단한다"를 의미한다.
그러나 함수/엔드포인트 내부에서 I/O 바운드 또는 CPU 바운드 차단 작업을 실행한 경우에는 해당 작업이 수행됩니다(예: ). 따라서 경로에서 와 같은 차단 작업을 수행하면 전체 서버가 차단됩니다(질문에 제공된 예 참조). 따라서 함수가 호출을 수행하지 않을 경우 다음과 같이 (외부 스레드 풀에서 실행된 후 대기)로 선언할 수 있습니다:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
그렇지 않으면 기능을 호출하려면 를 사용하여 엔드포인트를 정의해야 합니다. 이를 입증하기 위해 아래는 라이브러리의 기능을 사용합니다. 유사한 예가 제공된다.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
위의 두 경로 작동 기능은 두 개의 요청이 거의 동시에 도착하는 경우 질문에 언급된 것과 동일한 순서로 화면에 지정된 메시지를 출력합니다:
Hello
Hello
bye
bye
중요 참고
두 번째(세 번째 등) 시간 동안 엔드포인트를 호출할 때는 에서 이 작업을 수행해야 합니다. 그렇지 않으면 브라우저가 다음 요청을 보내기 전에 이전 요청에 대한 서버의 응답을 기다리고 있기 때문에 후속 요청(즉, 첫 번째 요청 이후)이 브라우저에 의해 차단됩니다. 엔드포인트 내부에서 확인할 수 있습니다. 엔드포인트 내부에서 모든 수신 요청(같은 브라우저 창/세션에서 열린 탭에서 요청이 시작된 경우)에 대해 및 번호가 동일하므로 브라우저에서 요청을 순차적으로 전송하므로 해당 요청이 순차적으로 처리됩니다. 이를 위해 다음 중 하나를 수행할 수 있습니다:
동일한 탭을 다시 로드합니다(실행 중)
Incognito 창에서 새 탭을 엽니다
다른 브라우저/클라이언트를 사용하여 요청을 전송하거나
라이브러리를 와 함께 사용하면 여러 비동기 작업을 동시에 실행한 다음 대기 가능(작업)이 해당 함수에 전달된 순서대로 결과 목록을 반환합니다(자세한 내용은 를 참조하십시오).
:
import httpx import asyncio URLS = ['http://127.0.0.1:8000/ping'] * 2 async def send(url, client): return await client.get(url, timeout=10) async def main(): async with httpx.AsyncClient() as client: tasks = [send(url, client) for url in URLS] responses = await asyncio.gather(*tasks) print(*[r.json() for r in responses], sep='\n') asyncio.run(main())
요청을 처리하는 데 서로 다른 시간이 걸릴 수 있는 다른 엔드포인트를 호출해야 하는 경우, 모든 작업의 결과를 수집하고 작업이 함수에 전달된 순서와 동일한 순서로 출력하기를 기다리지 않고 서버에서 반환되는 즉시 클라이언트 측에서 응답을 출력하려는 경우, 이를 대체할 수 있습니다위 예제의 e 기능은 아래와 같습니다:
async def send(url, client): res = await client.get(url, timeout=10) print(res.json()) return res
비동기/대기 및 IO 바인딩/CPU 바인딩 작업 차단
루트 내의 코루틴을 사용해야 하지만 이벤트 루프(기본적으로 전체 서버)를 차단하고 다른 요청을 통과시키지 않는 동기식 I/O 바운드 또는 CPU 바운드 작업(장기 실행 계산 작업)이 필요한 경우 다음과 같은 경우:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
그러면:
예를 들어, 엔드포인트에서 파일 내용을 읽는 방법(아래 주석 섹션에서 언급한 것처럼)만 기다리고 있으면 엔드포인트의 매개 변수 유형을 (즉, )로 선언할 수 있으므로 FastAPI는 다음과 같이 정의할 수 있습니다 당신을 위해 파일을 읽으면 당신은 그 내용을 받을 것이다. 따라서 파일 내용 전체가 메모리에 저장되므로 작은 파일에 대해서는 위의 방법을 사용할 수 있습니다( 참조). 따라서 시스템에 축적된 데이터를 수용할 수 있는 충분한 RAM이 없는 경우(예: 8GB RAM이 있는 경우 50GB 파일을 로드할 수 없습니다)이온이 충돌할 수도 있습니다. 또는 객체의 속성을 통해 직접 액세스할 수 있는 메소드를 호출하여 메소드를 다시 호출할 필요가 없으며, 이제 일반 엔드포인트를 선언할 수 있으므로 각 요청이 에서 실행됩니다(아래 예 참조). 업로드 방법과 Starlette/FastAPI가 백그라운드에서 사용하는 방법에 대한 자세한 내용은 및 을 참조하십시오.
@app.post("/ping") def ping(file: UploadFile = File(...)): print("Hello") try: contents = file.file.read() res = cpu_bound_task(contents) finally: file.file.close() print("bye") return "pong"
빠른 사용@tiangolo가 제안한 대로 모듈의 API(Starlette's) 함수는 "주 스레드(코루틴이 실행되는 곳)가 차단되지 않도록 하기 위해 별도의 스레드에서 함수를 실행할 것"이다. @tiangolo에서 설명한 바와 같이, "는 대기 함수이며, 첫 번째 매개 변수는 정상 함수이며, 다음 매개 변수는 해당 함수에 직접 전달됩니다. 시퀀스 인수와 키워드 인수를 모두 지원한다."
from fastapi.concurrency import run_in_threadpool res = await run_in_threadpool(cpu_bound_task, contents)
또는 를 사용하여 실행 중인 이벤트 루프를 얻은 후 를 사용하여 작업을 실행합니다. 이 경우 다음 코드 행으로 이동하기 전에 작업을 완료하고 결과를 반환할 수 있습니다. 인수를 전달하면 기본 실행자가 사용됩니다:
import asyncio loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, cpu_bound_task, contents)
또는 다음에 대한 설명서에서 권장하는 식을 사용할 수 있습니다:
import asyncio from functools import partial loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
사용자 정의에서 태스크를 실행할 수도 있습니다. 예를 들어:
import asyncio import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
에서는 이벤트 루프가 차단되는 것을 성공적으로 방지할 수 있지만 실행할 때 예상할 수 있는 기능을 제공하지는 않습니다. 특히 설명된 작업(예: 오디오 또는 이미지 처리, 기계 학습 등)을 수행해야 할 경우에는 더욱 그렇습니다. 따라서 작업을 완료하고 결과를 반환하기 위해 와 통합할 수 있는 아래 그림과 같이 를 사용하는 것이 좋습니다. 설명한 바와 같이, 윈도우에서는 하위 프로세스 등의 재귀적인 산란을 방지하기 위해 코드의 메인 루프를 보호하는 것이 중요하다. 기본적으로 코드는 아래에 있어야 합니다.
import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
를 사용합니다. 예를 들어 를 사용하는 경우 를 참조하십시오. 각 작업자. 즉, 변수/객체 등은 프로세스/작업자 간에 공유되지 않습니다. 이 경우 및 설명에 따라 데이터베이스 저장소 또는 Key-Value 저장소(Caches)를 사용하는 것을 고려해야 합니다. 또한 이 점에 유의하십시오.
If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.