반응형
Kubernetes로 배포하고 Ingress를 통해 연결한 후 SSE가 끊어짐
아임브 어 리액트이벤트 스트림과 SSE를 구현하는 골랑 백엔드를 사용하는 JS 클라이언트.
내가 브라우저를 로컬 호스트에서 실행되는 백엔드에 연결했을 때와 포트 포워딩이 있는 k8s에서 실행되었을 때 모든 것이 작동하는 것처럼 보였다.
호스트 이름을 사용하여 입력을 생성한 즉시(항상 포트 포워딩할 필요가 없도록) SSE가 작동을 중지했습니다. 나는 여전히 고객이 요청을 보내고 이 요청이 백엔드에 의해 수신되고 등록되는 것을 봅니다. 그러나 이벤트와 이벤트가 전송되면 내 ReactJS 앱에 도착하지 않습니다.
백엔드 SSE 구현을 위한 코드를 첨부합니다:
package sse
import (
"encoding/json"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
"github.com/talon-one/towers/controller/api/log"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 2
type customerStateUpdate struct {
sseEvent
CustomerName string `json:"customer_name"`
CustomerState string `json:"customer_state"`
}
type contentUpdate struct {
sseEvent
}
type sseEvent struct {
EventType string `json:"event_type"`
}
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
log *log.Logger
}
func NewBroker(log *log.Logger) (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
log: log.With(zap.String("component", "SSE")),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) HandleContentChange() error {
event := contentUpdate{
sseEvent: sseEvent{EventType: "contentUpdate"},
}
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) HandleCustomerStateChange(name, state string) error {
event := customerStateUpdate{
sseEvent: sseEvent{EventType: "customerStateUpdate"},
CustomerName: name,
CustomerState: state,
}
broker.log.Info("Sending SSE to registered clients", zap.String("name", name), zap.String("state", state))
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
notify := rw.(http.CloseNotifier).CloseNotify()
for {
select {
case <-notify:
return
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", msg)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
broker.log.Info("Client added", zap.Int("current_count", len(broker.clients)))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
broker.log.Info("Client removed", zap.Int("current_count", len(broker.clients)))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
broker.log.Info("Skipping client")
}
}
}
}
}
그리고 내 ReactJS 앱에서:
export default class CustomersTable extends Component {
constructor(props) {
super(props)
this.eventSource = new EventSource('/v1/events')
}
updateCustomerState(e) {
let event = JSON.parse(e.data)
switch (event.event_type) {
case 'customerStateUpdate':
let newData = this.state.customers.map(item => {
if (item.name === event.customer_name) {
item.k8sState = event.customer_state
}
return item
})
this.setState(Object.assign({}, { customers: newData }))
break
case 'contentUpdate':
this.reload()
break
default:
break
}
}
componentDidMount() {
this.setState({ isLoading: true })
ReactModal.setAppElement('body')
this.reload()
this.eventSource.onmessage = e => this.updateCustomerState(e)
}
componentWillUnmount() {
this.eventSource.close()
}
...
다음을 사용하여 Nginx Ingress에 대한 SSE 앱 작업을 수행했습니다:
annotations:
nginx.ingress.kubernetes.io/proxy-read-timeout: "21600"
nginx.ingress.kubernetes.io/eventsource: "true"
입력 주석에 "nginx.ingress.kubernetes.io/eventsource " 항목이 없음
반응형
'개발하자' 카테고리의 다른 글
kubernetes에서 DNS를 확인할 수 없습니다 (0) | 2023.10.16 |
---|---|
Python 기존 XLSM에 매크로 추가 (0) | 2023.10.16 |
helm의 '--dry-run' 옵션은 kubernetes API 서버에 연결이 필요한가? 면방향 연결 오류 (0) | 2023.10.15 |
창에서 주피터 노트북에서 사용하는 기본 브라우저를 변경하는 방법 (0) | 2023.10.14 |
Gmail API(피톤)를 이용하여 메시지 본문 전체를 검색하는 방법 (1) | 2023.10.14 |