package api import ( "bufio" "io" "net/http" "github.com/go-chi/chi/v5" "github.com/google/uuid" "nhooyr.io/websocket" "github.com/agentsphere/agent-mgr/internal/provider" "github.com/agentsphere/agent-mgr/internal/provider/claudecode" ) func (s *Server) streamSession(w http.ResponseWriter, r *http.Request) { sessID, err := uuid.Parse(chi.URLParam(r, "sessionID")) if err != nil { http.Error(w, "invalid session id", http.StatusBadRequest) return } sess, err := s.store.GetSession(r.Context(), sessID) if err != nil || sess == nil { http.Error(w, "session not found", http.StatusNotFound) return } p, err := s.registry.Get(sess.Provider) if err != nil { http.Error(w, "provider not found", http.StatusInternalServerError) return } handle := &provider.SessionHandle{SessionID: sess.ID, PodName: sess.PodName} conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, // Traefik handles TLS/origin }) if err != nil { s.log.Error("websocket accept failed", "err", err) return } defer conn.CloseNow() ctx := conn.CloseRead(r.Context()) stream, err := p.StreamOutput(ctx, handle) if err != nil { s.log.Error("stream output failed", "err", err, "pod", sess.PodName) conn.Close(websocket.StatusInternalError, "failed to stream logs") return } defer stream.Close() tracker := claudecode.NewProgressTracker() scanner := bufio.NewScanner(stream) scanner.Buffer(make([]byte, 64*1024), 64*1024) for scanner.Scan() { line := scanner.Text() tracker.ProcessLine(line) if err := conn.Write(ctx, websocket.MessageText, []byte(line)); err != nil { break } } if err := scanner.Err(); err != nil && err != io.EOF { s.log.Warn("stream scanner error", "err", err) } conn.Close(websocket.StatusNormalClosure, "stream ended") }