본문 바로가기

개발하자

socket.io 을 fastapi 앱에 마운트하고 연결된 모든 클라이언트에 브로드캐스트를 전송하는 방법

반응형

socket.io 을 fastapi 앱에 마운트하고 연결된 모든 클라이언트에 브로드캐스트를 전송하는 방법

웹 소켓을 사용하고 연결된 모든 클라이언트에게 메시지를 브로드캐스트할 수 있는 fastapi 어플리케이션을 만들기 위해 노력했습니다. 웹소켓으로는 불가능하다는 것을 알게 되었지만 - 광채나는 라이브러리를 발견했다. 그러나 어떻게 사용하고 기존의 fastapi 앱과 통합할 수 있는지 잘 모르겠다.




# server.py
from typing import Any

import socketio
import uvicorn
from fastapi import FastAPI

sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()


@app.get("/test")
async def test():
    print("test")
    return "WORKS"


app.mount("/", socket_app)  # Here we mount socket app to main fastapi app


@sio.on("connect")
async def connect(sid, env):
    print("on connect")


@sio.on("direct")
async def direct(sid, msg):
    print(f"direct {msg}")
    # we can send message to specific sid
    await sio.emit("event_name", msg, room=sid)


@sio.on("broadcast")
async def broadcast(sid, msg):
    print(f"broadcast {msg}")
    await sio.emit("event_name", msg)  # or send to everyone


@sio.on("disconnect")
async def disconnect(sid):
    print("on disconnect")


if __name__ == "__main__":
    kwargs = {"host": "0.0.0.0", "port": 5000}
    kwargs.update({"debug": True, "reload": True})
    uvicorn.run("server:app", **kwargs)
# client.py
import requests
import socketio

r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()


@cl.on("event_name")
def foo(data):
    print(f"client 1 {data}")


@cl2.on("event_name")
def foo2(data):
    print(f"client 2 {data}")


cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2

마지막에 적절한 종속성을 설치합니다:

# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client

반응형