개발하자

빠른 API 비동기 백그라운드 작업이 다른 요청을 차단합니까?

Cuire 2023. 1. 24. 01:45
반응형

빠른 API 비동기 백그라운드 작업이 다른 요청을 차단합니까?

데이터베이스에 덤프하기 전에 계산을 포함하는 간단한 백그라운드 작업을 FastAPI에서 실행하려고 합니다. 그러나 계산은 더 이상의 요청을 수신하는 것을 차단할 것이다.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()
db = Database()

async def task(data):
    otherdata = await db.fetch("some sql")
    newdata = somelongcomputation(data,otherdata) # this blocks other requests
    await db.execute("some sql",newdata)
   


@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
    background_tasks.add_task(task, data)
    return {}

이 문제를 해결하는 가장 좋은 방법은 무엇입니까?




는 로 정의되며, 이는 fastapi(또는 starlette)가 비동기 이벤트 루프에서 실행한다는 것을 의미합니다. 그리고 동기화되어 있기 때문에(즉, IO를 기다리는 것이 아니라 계산을 하는 것) 실행 중인 동안 이벤트 루프를 차단합니다.

이 문제를 해결하는 몇 가지 방법이 있습니다:

  • 작업자를 더 많이 사용합니다(예: ). 이렇게 하면 병렬로 최대 4개까지 허용됩니다.

  • 작업을 하지 않도록 다시 작성합니다(즉, 작업을 등으로 정의). 그런 다음 starlette는 그것을 별도의 스레드에서 실행할 것입니다.

  • 를 사용하면 별도의 스레드에서도 실행됩니다. 이와 같이:

    from fastapi.concurrency import run_in_threadpool
    async def task(data):
        otherdata = await db.fetch("some sql")
        newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
        await db.execute("some sql", newdata)
    
    • 또는 직접 사용(후드 아래에서 사용): 별도의 프로세스에서 실행하기 위한 첫 번째 인수로 a를 전달할 수도 있습니다.
  • 별도의 스레드/프로세스를 생성합니다. 예: 을 사용합니다.

  • 셀러리와 같은 좀 더 고압적인 것을 사용하십시오. (fastapi 문서에서도 언급됨).




이거 읽어봐.

아래 예제에서도 차단 함수 또는 프로세스일 수 있습니다.

TL;DR

from starlette.concurrency import run_in_threadpool

@app.get("/long_answer")
async def long_answer():
    rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
    return rst



작업이 CPU에 바인딩된 경우 다중 처리를 사용할 수 있습니다. 빠른 속도에서 백그라운드 작업을 사용할 수 있습니다API:

CPU 부하가 높은 작업이 많은 경우 Celery와 같은 것을 사용하는 것을 고려해야 합니다.




다음은 빠른 백그라운드 작업의 예입니다API

from fastapi import FastAPI
import asyncio
app = FastAPI()
x = [1]           # a global variable x
@app.get("/")
def hello():
    return {"message": "hello", "x":x}
async def periodic():
    while True:
        # code to run periodically starts here
        x[0] += 1
        print(f"x is now {x}")
        # code to run periodically ends here
        # sleep for 3 seconds after running above code
        await asyncio.sleep(3)
@app.on_event("startup")
async def schedule_periodic():
    loop = asyncio.get_event_loop()
    loop.create_task(periodic())
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app)

반응형