Python FastAPI에서 원시 HTTP 요청/응답을 기록하는 방법은 무엇입니까?
Python FastAPI에서 원시 HTTP 요청/응답을 기록하는 방법은 무엇입니까?
우리는 Python Fast를 사용하여 웹 서비스를 작성하고 있다쿠버네티스에서 호스팅할 API입니다. 감사 목적으로 /의 원시 JSON 본문을 경로용으로 저장해야 합니다. 와 JSON 모두의 신체 크기는 약이며, 바람직하게는 이것이 응답 시간에 영향을 미치지 않아야 한다. 어떻게 해야 되지?
Fast에서처럼 사용자 지정할 수 있습니다API 공식 문서:
import time
from typing import Callable
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@app.get("/")
async def not_timed():
return {"message": "Not timed"}
@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}
app.include_router(router)
다른 답변들은 나에게 효과가 없었고 나는 이 문제를 해결하기 위해 스택 오버플로에 대해 꽤 광범위하게 검색했기 때문에, 나는 아래에 나의 해결책을 보여줄 것이다.
주요 문제는 요청 본문 또는 응답 본문을 사용할 때 요청/응답 본문이 스트림에서 이를 읽는 데 소모되기 때문에 온라인에서 제공되는 많은 접근/솔루션이 작동하지 않는다는 것입니다.
이 문제를 해결하기 위해 요청과 응답을 읽은 후 기본적으로 재구성하는 접근법을 적용했습니다. 이는 사용자 '코발레블라드'의 댓글에 크게 기반을 두고 있다.
사용자 지정 미들웨어가 생성되고 나중에 앱에 추가되어 모든 요청과 응답을 기록합니다. 로그를 저장하려면 로거 종류가 필요합니다.
from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message
# Set up your custom logger here
logger = ""
class RequestWithBody(Request):
"""Creation of new request with body"""
def __init__(self, scope: Scope, body: bytes) -> None:
super().__init__(scope, self._receive)
self._body = body
self._body_returned = False
async def _receive(self) -> Message:
if self._body_returned:
return {"type": "http.disconnect"}
else:
self._body_returned = True
return {"type": "http.request", "body": self._body, "more_body": False}
class CustomLoggingMiddleware(BaseHTTPMiddleware):
"""
Use of custom middleware since reading the request body and the response consumes the bytestream.
Hence this approach to basically generate a new request/response when we read the attributes for logging.
"""
async def dispatch( # type: ignore
self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
) -> Response:
# Store request body in a variable and generate new request as it is consumed.
request_body_bytes = await request.body()
request_with_body = RequestWithBody(request.scope, request_body_bytes)
# Store response body in a variable and generate new response as it is consumed.
response = await call_next(request_with_body)
response_content_bytes, response_headers, response_status = await self._get_response_params(response)
# Logging
# If there is no request body handle exception, otherwise convert bytes to JSON.
try:
req_body = json.loads(request_body_bytes)
except JSONDecodeError:
req_body = ""
# Logging of relevant variables.
logger.info(
f"{request.method} request to {request.url} metadata\n"
f"\tStatus_code: {response.status_code}\n"
f"\tRequest_Body: {req_body}\n"
)
# Finally, return the newly instantiated response values
return Response(response_content_bytes, response_status, response_headers)
async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
"""Getting the response parameters of a response and create a new response."""
response_byte_chunks: List[bytes] = []
response_status: List[int] = []
response_headers: List[Dict[str, str]] = []
async def send(message: Message) -> None:
if message["type"] == "http.response.start":
response_status.append(message["status"])
response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
else:
response_byte_chunks.append(message["body"])
await response.stream_response(send)
content = b"".join(response_byte_chunks)
return content, response_headers[0], response_status[0]
옵션 1 - 미들웨어 사용
를 사용할 수 있습니다. A은(는) 응용 프로그램에 오는 각 요청을 사용하므로 특정 엔드포인트에 의해 처리되기 전에 처리할 수 있으며 클라이언트에 반환되기 전에 처리할 수도 있습니다. 를 만들려면 아래와 같이 기능 위에 장식자를 사용합니다. 내부 스트림에서 요청 본문을 사용해야 하기 때문에 (장면 뒤에서, 전자 메서드는 실제로 후자를 호출합니다. 참조) 또는 를 사용하여 나중에 를 해당 끝점으로 전달할 때 요청 본문을 사용할 수 없습니다. 따라서 에 설명된 방법에 따라 요청 본문을 라인 아래에서 사용할 수 있습니다(즉, 아래 기능 사용). 본문의 경우 에 설명된 것과 동일한 방법을 사용하여 본문을 사용한 다음 클라이언트에 반환할 수 있습니다. 아래는 본문을 바이트 객체에 저장하고 사용자 정의를 직접 반환하는 옵션 2를 사용한다.
데이터를 기록하려면 및 에 설명된 대로 를 사용할 수 있습니다. A는 응답이 전송된 후에만 실행되므로 클라이언트가 로깅이 완료되기를 기다릴 필요가 없습니다(따라서 응답 시간은 눈에 띄게 영향을 받지 않습니다).
메모
스트리밍을 하거나 서버의 RAM에 맞지 않는 바디(예: 8GB RAM을 실행하는 시스템에서 100GB 바디를 상상해 보십시오)가 있는 경우, 데이터를 RAM에 저장하기 때문에 누적된 데이터를 수용할 수 있는 공간이 충분하지 않을 수 있습니다. 또한 크기가 큰 경우(예: 크기가 큰 경우 또는 사용 중인 경우 역방향 프록시 쪽) 오류가 발생할 수 있습니다. 전체 응답 본문을 읽을 때까지 클라이언트에 응답할 수 없기 때문입니다. 당신은 그것을 언급했다; 그러므로, 그것은 보통 괜찮을 것이다. (그러나 이것이 문제인지 아닌지를 판단하기 위해 당신의 API가 동시에 처리될 것으로 예상되는 요청 수, RAM을 사용하는 다른 애플리케이션 등과 같은 문제를 사전에 고려하는 것이 항상 좋은 관행이다.). 필요한 경우 예를 들어 ( 참조)를 사용하여 API 끝점에 대한 요청 수를 제한할 수 있습니다.
사용을 특정 경로로만 제한
다음을 통해 사용을 특정 엔드포인트로 제한할 수 있습니다:
- 미리 정의된 경로 목록과 비교하여 미들웨어 내부를 확인합니다(" 섹션 참조),
- 또는 에 설명된 대로 를 사용합니다
- 또는 아래에 설명된 대로 사용자 지정 클래스를 사용합니다.
작업 예제
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging
app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = await request.body()
await set_body(request, req_body)
response = await call_next(request)
res_body = b''
async for chunk in response.body_iterator:
res_body += chunk
task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)
@app.post('/')
def main(payload: Dict[Any, Any]):
return payload
요청 본문 크기가 특정 값을 초과하지 않도록 하는 등 요청 본문에 대한 유효성 검사를 수행하려는 경우 아래와 같이 방법을 사용하여 본문을 한 번에 하나씩 처리할 수 있습니다(와 유사).
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = b''
async for chunk in request.stream():
req_body += chunk
...
옵션 2 - 사용자 정의 클래스 사용
또는 과 유사한 을 사용할 수도 있습니다. 이를 통해 응용프로그램에서 처리하기 전에 본문을 조작할 수 있고 클라이언트로 반환되기 전에 본문을 조작할 수도 있습니다. 이 옵션을 사용하면 아래의 엔드포인트만 사용자 지정 클래스를 사용하므로 이 클래스의 사용을 원하는 경로로 제한할 수도 있습니다.
위의 "" 섹션에서 언급한 동일한 주석이 이 옵션에도 적용된다는 점에 유의해야 합니다. 예를 들어, API가 (아래 예제의 경로에서)를 반환하는 경우(이를 테스트하기 위해 공개 비디오를 찾을 수 있으며, 아래에 사용된 비디오보다 긴 비디오를 사용하여 효과를 더 명확하게 볼 수도 있음), 서버의 RAM이 이를 처리할 수 없는 경우 서버 측의 문제와 전체로 인한 클라이언트 측의 지연이 발생할 수 있습니다 (스트리밍) 응답을 읽고 RAM에 저장한 후 클라이언트에 반환합니다(앞에서 설명한 바와 같이). 이러한 경우, 단순히 해당 엔드포인트에 데코레이터(아래 예)를 사용하지 않고 데코레이터(예: 예)를 사용하면(예: 예) 사용자 지정 클래스에서 a를 반환하는 엔드포인트를 제외할 수 있습니다(특히, 큰 비디오 파일이거나, 로그에 저장하는 것이 그다지 의미가 없는 라이브 비디오인 경우)low), 또는 다른 일부 또는.
작업 예제
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
response = await original_route_handler(request)
if isinstance(response, StreamingResponse):
res_body = b''
async for item in response.body_iterator:
res_body += item
task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)
else:
res_body = response.body
response.background = BackgroundTask(log_info, req_body, res_body)
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)
@router.post('/')
def main(payload: Dict[Any, Any]):
return payload
@router.get('/video')
def get_video():
url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
def gen():
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw():
yield chunk
return StreamingResponse(gen(), media_type='video/mp4')
app.include_router(router)