Skip to content

Commit 192b0e2

Browse files
committed
initial implementation
1 parent f256a23 commit 192b0e2

File tree

10 files changed

+1429
-0
lines changed

10 files changed

+1429
-0
lines changed

agent/agent.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/coder/coder/v2/codersdk"
5151
"github.com/coder/coder/v2/codersdk/agentsdk"
5252
"github.com/coder/coder/v2/codersdk/workspacesdk"
53+
"github.com/coder/coder/v2/immortalstream"
5354
"github.com/coder/coder/v2/tailnet"
5455
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
5556
"github.com/coder/quartz"
@@ -197,6 +198,14 @@ func New(options Options) Agent {
197198
devcontainers: options.Devcontainers,
198199
containerAPIOptions: options.DevcontainerAPIOptions,
199200
}
201+
202+
// Initialize immortal stream manager with a local dialer
203+
a.immortalStreamManager = immortalstream.NewManager(
204+
options.Logger.Named("immortal-stream"),
205+
func(network, address string) (net.Conn, error) {
206+
return net.Dial(network, address)
207+
},
208+
)
200209
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
201210
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
202211
// that gets closed on disconnection. This is used to wait for graceful disconnection from the
@@ -280,6 +289,9 @@ type agent struct {
280289
devcontainers bool
281290
containerAPIOptions []agentcontainers.Option
282291
containerAPI *agentcontainers.API
292+
293+
// immortalStreamManager handles immortal stream connections
294+
immortalStreamManager *immortalstream.Manager
283295
}
284296

285297
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -1930,6 +1942,10 @@ func (a *agent) Close() error {
19301942
a.logger.Error(a.hardCtx, "container API close", slog.Error(err))
19311943
}
19321944

1945+
if err := a.immortalStreamManager.Close(); err != nil {
1946+
a.logger.Error(a.hardCtx, "immortal stream manager close", slog.Error(err))
1947+
}
1948+
19331949
// Wait for the graceful shutdown to complete, but don't wait forever so
19341950
// that we don't break user expectations.
19351951
go func() {

agent/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ func (a *agent) apiHandler() http.Handler {
6666
r.Get("/debug/manifest", a.HandleHTTPDebugManifest)
6767
r.Get("/debug/prometheus", promHandler.ServeHTTP)
6868

69+
// Mount immortal stream routes
70+
r.Mount("/api/v0/immortal-stream", newImmortalStreamHandler(a.immortalStreamManager).Routes())
71+
6972
return r
7073
}
7174

agent/immortalstream.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package agent
2+
3+
import (
4+
"encoding/json"
5+
"net"
6+
"net/http"
7+
"strconv"
8+
9+
"github.com/go-chi/chi/v5"
10+
"github.com/google/uuid"
11+
"golang.org/x/xerrors"
12+
13+
"github.com/coder/coder/v2/coderd/httpapi"
14+
"github.com/coder/coder/v2/codersdk"
15+
"github.com/coder/coder/v2/immortalstream"
16+
)
17+
18+
// immortalStreamHandler handles HTTP requests for immortal streams.
19+
type immortalStreamHandler struct {
20+
manager *immortalstream.Manager
21+
}
22+
23+
// newImmortalStreamHandler creates a new immortal stream handler.
24+
func newImmortalStreamHandler(manager *immortalstream.Manager) *immortalStreamHandler {
25+
return &immortalStreamHandler{
26+
manager: manager,
27+
}
28+
}
29+
30+
// Routes sets up the immortal stream routes.
31+
func (h *immortalStreamHandler) Routes() chi.Router {
32+
r := chi.NewRouter()
33+
34+
r.Post("/", h.createStream)
35+
r.Get("/", h.listStreams)
36+
r.Get("/{id}", h.connectToStream)
37+
r.Delete("/{id}", h.deleteStream)
38+
39+
return r
40+
}
41+
42+
// createStream handles POST /api/v0/immortal-stream
43+
func (h *immortalStreamHandler) createStream(w http.ResponseWriter, r *http.Request) {
44+
ctx := r.Context()
45+
46+
var req immortalstream.CreateStreamRequest
47+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
48+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
49+
Message: "Invalid request body",
50+
Detail: err.Error(),
51+
})
52+
return
53+
}
54+
55+
resp, err := h.manager.CreateStream(ctx, req)
56+
if err != nil {
57+
if xerrors.Is(err, xerrors.Errorf("Too many Immortal Streams")) {
58+
httpapi.Write(ctx, w, http.StatusServiceUnavailable, codersdk.Response{
59+
Message: "Too many Immortal Streams",
60+
})
61+
return
62+
}
63+
if xerrors.Is(err, xerrors.Errorf("The connection was refused")) {
64+
httpapi.Write(ctx, w, http.StatusNotFound, codersdk.Response{
65+
Message: "The connection was refused",
66+
})
67+
return
68+
}
69+
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
70+
Message: "Failed to create immortal stream",
71+
Detail: err.Error(),
72+
})
73+
return
74+
}
75+
76+
httpapi.Write(ctx, w, http.StatusCreated, resp)
77+
}
78+
79+
// listStreams handles GET /api/v0/immortal-stream
80+
func (h *immortalStreamHandler) listStreams(w http.ResponseWriter, r *http.Request) {
81+
ctx := r.Context()
82+
83+
streams := h.manager.ListStreams()
84+
httpapi.Write(ctx, w, http.StatusOK, streams)
85+
}
86+
87+
// connectToStream handles GET /api/v0/immortal-stream/{id}
88+
func (h *immortalStreamHandler) connectToStream(w http.ResponseWriter, r *http.Request) {
89+
ctx := r.Context()
90+
91+
// Parse stream ID from URL
92+
idStr := chi.URLParam(r, "id")
93+
streamID, err := uuid.Parse(idStr)
94+
if err != nil {
95+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
96+
Message: "Invalid stream ID",
97+
Detail: err.Error(),
98+
})
99+
return
100+
}
101+
102+
// Check if the stream exists
103+
stream, err := h.manager.GetStream(streamID)
104+
if err != nil {
105+
httpapi.Write(ctx, w, http.StatusNotFound, codersdk.Response{
106+
Message: "Stream not found",
107+
})
108+
return
109+
}
110+
111+
// Check for upgrade headers
112+
if r.Header.Get("Upgrade") != "coder-immortal-stream" || r.Header.Get("Connection") != "upgrade" {
113+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
114+
Message: "Missing required upgrade headers",
115+
Detail: "Expected Upgrade: coder-immortal-stream and Connection: upgrade",
116+
})
117+
return
118+
}
119+
120+
// Parse sequence numbers from headers
121+
readerSeqNum, err := parseSequenceNumber(r.Header.Get("x-coder-immortal-stream-sequence-num"))
122+
if err != nil {
123+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
124+
Message: "Invalid reader sequence number",
125+
Detail: err.Error(),
126+
})
127+
return
128+
}
129+
130+
// Get current sequence numbers for the response header
131+
_, currentWriterSeq := stream.GetSequenceNumbers()
132+
133+
// Upgrade the connection
134+
hijacker, ok := w.(http.Hijacker)
135+
if !ok {
136+
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
137+
Message: "Connection hijacking not supported",
138+
})
139+
return
140+
}
141+
142+
// Send upgrade response
143+
w.Header().Set("Upgrade", "coder-immortal-stream")
144+
w.Header().Set("Connection", "upgrade")
145+
w.Header().Set("x-coder-immortal-stream-sequence-num", strconv.FormatInt(currentWriterSeq, 10))
146+
w.WriteHeader(http.StatusSwitchingProtocols)
147+
148+
// Hijack the connection
149+
rawConn, _, err := hijacker.Hijack()
150+
if err != nil {
151+
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
152+
Message: "Failed to hijack connection",
153+
Detail: err.Error(),
154+
})
155+
return
156+
}
157+
158+
// Connect to the immortal stream
159+
err = h.manager.ConnectToStream(ctx, streamID, rawConn, readerSeqNum, currentWriterSeq)
160+
if err != nil {
161+
_ = rawConn.Close()
162+
return
163+
}
164+
165+
// The connection is now managed by the immortal stream
166+
}
167+
168+
// deleteStream handles DELETE /api/v0/immortal-stream/{id}
169+
func (h *immortalStreamHandler) deleteStream(w http.ResponseWriter, r *http.Request) {
170+
ctx := r.Context()
171+
172+
// Parse stream ID from URL
173+
idStr := chi.URLParam(r, "id")
174+
streamID, err := uuid.Parse(idStr)
175+
if err != nil {
176+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
177+
Message: "Invalid stream ID",
178+
Detail: err.Error(),
179+
})
180+
return
181+
}
182+
183+
err = h.manager.DeleteStream(streamID)
184+
if err != nil {
185+
if xerrors.Is(err, xerrors.Errorf("stream not found")) {
186+
httpapi.Write(ctx, w, http.StatusNotFound, codersdk.Response{
187+
Message: "Stream not found",
188+
})
189+
return
190+
}
191+
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
192+
Message: "Failed to delete stream",
193+
Detail: err.Error(),
194+
})
195+
return
196+
}
197+
198+
w.WriteHeader(http.StatusNoContent)
199+
}
200+
201+
// parseSequenceNumber parses a sequence number from a header value.
202+
func parseSequenceNumber(value string) (int64, error) {
203+
if value == "" {
204+
return 0, nil
205+
}
206+
207+
seqNum, err := strconv.ParseInt(value, 10, 64)
208+
if err != nil {
209+
return 0, xerrors.Errorf("parse sequence number: %w", err)
210+
}
211+
212+
return seqNum, nil
213+
}
214+
215+
// httpUpgradeConn wraps a net.Conn to implement io.ReadWriteCloser for HTTP upgrade connections.
216+
type httpUpgradeConn struct {
217+
net.Conn
218+
}
219+
220+
func (c *httpUpgradeConn) Read(b []byte) (int, error) {
221+
return c.Conn.Read(b)
222+
}
223+
224+
func (c *httpUpgradeConn) Write(b []byte) (int, error) {
225+
return c.Conn.Write(b)
226+
}
227+
228+
func (c *httpUpgradeConn) Close() error {
229+
return c.Conn.Close()
230+
}

0 commit comments

Comments
 (0)