본문 바로가기

개발하자

빠른 API 비동기 백그라운드 작업이 다른 요청을 차단합니까?

반응형

빠른 API 비동기 백그라운드 작업이 다른 요청을 차단합니까?

데이터베이스에 덤프하기 전에 계산을 포함하는 간단한 백그라운드 작업을 FastAPI에서 실행하려고 합니다. 그러나 계산은 더 이상의 요청을 수신하는 것을 차단할 것이다.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()
db = Database()

async def task(data):
    otherdata = await db.fetch("some sql")
    newdata = somelongcomputation(data,otherdata) # this blocks other requests
    await db.execute("some sql",newdata)
   


@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
    background_tasks.add_task(task, data)
    return {}

이 문제를 해결하는 가장 좋은 방법은 무엇입니까?




는 로 정의되며, 이는 fastapi(또는 starlette)가 비동기 이벤트 루프에서 실행한다는 것을 의미합니다. 그리고 동기화되어 있기 때문에(즉, IO를 기다리는 것이 아니라 계산을 하는 것) 실행 중인 동안 이벤트 루프를 차단합니다.

이 문제를 해결하는 몇 가지 방법이 있습니다:

  • 작업자를 더 많이 사용합니다(예: ). 이렇게 하면 병렬로 최대 4개까지 허용됩니다.

  • 작업을 하지 않도록 다시 작성합니다(즉, 작업을 등으로 정의). 그런 다음 starlette는 그것을 별도의 스레드에서 실행할 것입니다.

  • 를 사용하면 별도의 스레드에서도 실행됩니다. 이와 같이:

    from fastapi.concurrency import run_in_threadpool
    async def task(data):
        otherdata = await db.fetch("some sql")
        newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
        await db.execute("some sql", newdata)
    
    • 또는 직접 사용(후드 아래에서 사용): 별도의 프로세스에서 실행하기 위한 첫 번째 인수로 a를 전달할 수도 있습니다.
  • 별도의 스레드/프로세스를 생성합니다. 예: 을 사용합니다.

  • 셀러리와 같은 좀 더 고압적인 것을 사용하십시오. (fastapi 문서에서도 언급됨).




이거 읽어봐.

아래 예제에서도 차단 함수 또는 프로세스일 수 있습니다.

TL;DR

from starlette.concurrency import run_in_threadpool

@app.get("/long_answer")
async def long_answer():
    rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
    return rst



작업이 CPU에 바인딩된 경우 다중 처리를 사용할 수 있습니다. 빠른 속도에서 백그라운드 작업을 사용할 수 있습니다API:

CPU 부하가 높은 작업이 많은 경우 Celery와 같은 것을 사용하는 것을 고려해야 합니다.




다음은 빠른 백그라운드 작업의 예입니다API

from fastapi import FastAPI
import asyncio
app = FastAPI()
x = [1]           # a global variable x
@app.get("/")
def hello():
    return {"message": "hello", "x":x}
async def periodic():
    while True:
        # code to run periodically starts here
        x[0] += 1
        print(f"x is now {x}")
        # code to run periodically ends here
        # sleep for 3 seconds after running above code
        await asyncio.sleep(3)
@app.on_event("startup")
async def schedule_periodic():
    loop = asyncio.get_event_loop()
    loop.create_task(periodic())
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app)

반응형