Skip to content

Commit 9dd9c4a

Browse files
committed
backed reader, writer and pipe implementation
1 parent e80f91e commit 9dd9c4a

File tree

9 files changed

+2876
-0
lines changed

9 files changed

+2876
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
package backedpipe
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
"time"
8+
9+
"golang.org/x/xerrors"
10+
)
11+
12+
const (
13+
// DefaultBufferSize is the default buffer size for the BackedWriter (64MB)
14+
DefaultBufferSize = 64 * 1024 * 1024
15+
)
16+
17+
// ReconnectFunc is called when the BackedPipe needs to establish a new connection.
18+
// It should:
19+
// 1. Establish a new connection to the remote side
20+
// 2. Exchange sequence numbers with the remote side
21+
// 3. Return the new connection and the remote's current sequence number
22+
//
23+
// The writerSeqNum parameter is the local writer's current sequence number,
24+
// which should be sent to the remote side so it knows where to resume reading from.
25+
//
26+
// The returned readerSeqNum should be the remote side's current sequence number,
27+
// which indicates where the local reader should resume from.
28+
type ReconnectFunc func(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error)
29+
30+
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
31+
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
32+
// and data replay capabilities.
33+
type BackedPipe struct {
34+
ctx context.Context
35+
cancel context.CancelFunc
36+
mu sync.RWMutex
37+
reader *BackedReader
38+
writer *BackedWriter
39+
reconnectFn ReconnectFunc
40+
conn io.ReadWriteCloser
41+
connected bool
42+
closed bool
43+
44+
// Reconnection state
45+
reconnecting bool
46+
47+
// Monitoring state
48+
monitorRunning bool
49+
monitorCancel context.CancelFunc
50+
51+
// Connection state notification
52+
connectionChanged chan struct{}
53+
}
54+
55+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
56+
// The pipe starts disconnected and must be connected using Connect().
57+
func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
58+
pipeCtx, cancel := context.WithCancel(ctx)
59+
60+
bp := &BackedPipe{
61+
ctx: pipeCtx,
62+
cancel: cancel,
63+
reader: NewBackedReader(),
64+
writer: NewBackedWriterWithCapacity(DefaultBufferSize), // 64MB default buffer
65+
reconnectFn: reconnectFn,
66+
connectionChanged: make(chan struct{}, 1),
67+
}
68+
69+
return bp
70+
}
71+
72+
// Connect establishes the initial connection using the reconnect function.
73+
func (bp *BackedPipe) Connect(ctx context.Context) error {
74+
bp.mu.Lock()
75+
defer bp.mu.Unlock()
76+
77+
if bp.closed {
78+
return xerrors.New("pipe is closed")
79+
}
80+
81+
if bp.connected {
82+
return xerrors.New("pipe is already connected")
83+
}
84+
85+
return bp.reconnectLocked(ctx)
86+
}
87+
88+
// Read implements io.Reader by delegating to the BackedReader.
89+
func (bp *BackedPipe) Read(p []byte) (int, error) {
90+
bp.mu.RLock()
91+
reader := bp.reader
92+
closed := bp.closed
93+
bp.mu.RUnlock()
94+
95+
if closed {
96+
return 0, io.ErrClosedPipe
97+
}
98+
99+
return reader.Read(p)
100+
}
101+
102+
// Write implements io.Writer by delegating to the BackedWriter.
103+
func (bp *BackedPipe) Write(p []byte) (int, error) {
104+
bp.mu.RLock()
105+
writer := bp.writer
106+
closed := bp.closed
107+
bp.mu.RUnlock()
108+
109+
if closed {
110+
return 0, io.ErrClosedPipe
111+
}
112+
113+
return writer.Write(p)
114+
}
115+
116+
// Close closes the pipe and all underlying connections.
117+
func (bp *BackedPipe) Close() error {
118+
bp.mu.Lock()
119+
defer bp.mu.Unlock()
120+
121+
if bp.closed {
122+
return nil
123+
}
124+
125+
bp.closed = true
126+
127+
// Cancel monitor goroutine
128+
if bp.monitorCancel != nil {
129+
bp.monitorCancel()
130+
bp.monitorCancel = nil
131+
}
132+
133+
bp.cancel() // Cancel main context
134+
135+
// Close underlying components
136+
var readerErr, writerErr, connErr error
137+
138+
if bp.reader != nil {
139+
readerErr = bp.reader.Close()
140+
}
141+
142+
if bp.writer != nil {
143+
writerErr = bp.writer.Close()
144+
}
145+
146+
if bp.conn != nil {
147+
connErr = bp.conn.Close()
148+
bp.conn = nil
149+
}
150+
151+
bp.connected = false
152+
bp.signalConnectionChange()
153+
154+
// Return first error encountered
155+
if readerErr != nil {
156+
return readerErr
157+
}
158+
if writerErr != nil {
159+
return writerErr
160+
}
161+
return connErr
162+
}
163+
164+
// Connected returns whether the pipe is currently connected.
165+
func (bp *BackedPipe) Connected() bool {
166+
bp.mu.RLock()
167+
defer bp.mu.RUnlock()
168+
return bp.connected
169+
}
170+
171+
// signalConnectionChange signals that the connection state has changed.
172+
func (bp *BackedPipe) signalConnectionChange() {
173+
select {
174+
case bp.connectionChanged <- struct{}{}:
175+
default:
176+
// Channel is full, which is fine - we just want to signal that something changed
177+
}
178+
}
179+
180+
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
181+
func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
182+
if bp.reconnecting {
183+
return xerrors.New("reconnection already in progress")
184+
}
185+
186+
bp.reconnecting = true
187+
defer func() {
188+
bp.reconnecting = false
189+
}()
190+
191+
// Close existing connection if any
192+
if bp.conn != nil {
193+
bp.conn.Close()
194+
bp.conn = nil
195+
}
196+
197+
bp.connected = false
198+
bp.signalConnectionChange()
199+
200+
// Get current writer sequence number to send to remote
201+
writerSeqNum := bp.writer.SequenceNum()
202+
203+
// Unlock during reconnect attempt to avoid blocking reads/writes
204+
bp.mu.Unlock()
205+
conn, readerSeqNum, err := bp.reconnectFn(ctx, writerSeqNum)
206+
bp.mu.Lock()
207+
208+
if err != nil {
209+
return xerrors.Errorf("reconnect failed: %w", err)
210+
}
211+
212+
// Validate sequence numbers
213+
if readerSeqNum > writerSeqNum {
214+
conn.Close()
215+
return xerrors.Errorf("remote sequence number %d exceeds local sequence %d, cannot replay",
216+
readerSeqNum, writerSeqNum)
217+
}
218+
219+
// Validate writer can replay from the requested sequence
220+
if !bp.writer.CanReplayFrom(readerSeqNum) {
221+
conn.Close()
222+
// Calculate data loss
223+
var currentSeq = bp.writer.SequenceNum()
224+
var dataLoss = currentSeq - DefaultBufferSize - readerSeqNum
225+
return xerrors.Errorf("cannot replay from sequence %d (current: %d, data loss: ~%d bytes)",
226+
readerSeqNum, currentSeq, dataLoss)
227+
}
228+
229+
// Reconnect reader and writer
230+
seqNum := make(chan uint64, 1)
231+
newR := make(chan io.Reader, 1)
232+
233+
go bp.reader.Reconnect(seqNum, newR)
234+
235+
// Get sequence number and send new reader
236+
<-seqNum
237+
newR <- conn
238+
239+
err = bp.writer.Reconnect(ctx, readerSeqNum, conn)
240+
if err != nil {
241+
conn.Close()
242+
return xerrors.Errorf("reconnect writer: %w", err)
243+
}
244+
245+
// Success - update state
246+
bp.conn = conn
247+
bp.connected = true
248+
bp.signalConnectionChange()
249+
250+
// Cancel existing monitor if running
251+
if bp.monitorCancel != nil {
252+
bp.monitorCancel()
253+
bp.monitorCancel = nil
254+
}
255+
256+
// Start monitoring for connection failures
257+
monitorCtx, cancel := context.WithCancel(bp.ctx)
258+
bp.monitorCancel = cancel
259+
bp.monitorRunning = true
260+
261+
go bp.monitorConnection(monitorCtx)
262+
263+
return nil
264+
}
265+
266+
// monitorConnection monitors the connection and triggers reconnection on failure.
267+
func (bp *BackedPipe) monitorConnection(ctx context.Context) {
268+
defer func() {
269+
bp.mu.Lock()
270+
bp.monitorRunning = false
271+
bp.mu.Unlock()
272+
}()
273+
274+
ticker := time.NewTicker(time.Second)
275+
defer ticker.Stop()
276+
277+
for {
278+
select {
279+
case <-ctx.Done():
280+
return
281+
case <-ticker.C:
282+
bp.mu.RLock()
283+
connected := bp.connected
284+
closed := bp.closed
285+
readerConnected := bp.reader.Connected()
286+
writerConnected := bp.writer.Connected()
287+
bp.mu.RUnlock()
288+
289+
if closed {
290+
return
291+
}
292+
293+
// Check if either reader or writer has disconnected
294+
if connected && (!readerConnected || !writerConnected) {
295+
bp.mu.Lock()
296+
bp.connected = false
297+
bp.signalConnectionChange()
298+
bp.mu.Unlock()
299+
300+
// Try to reconnect once - if it fails, error out
301+
bp.mu.Lock()
302+
err := bp.reconnectLocked(ctx)
303+
bp.mu.Unlock()
304+
305+
if err != nil {
306+
// Reconnection failed - don't retry
307+
return
308+
}
309+
}
310+
}
311+
}
312+
}
313+
314+
// WaitForConnection blocks until the pipe is connected or the context is canceled.
315+
func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
316+
for {
317+
bp.mu.RLock()
318+
connected := bp.connected
319+
closed := bp.closed
320+
bp.mu.RUnlock()
321+
322+
if closed {
323+
return io.ErrClosedPipe
324+
}
325+
326+
if connected {
327+
return nil
328+
}
329+
330+
select {
331+
case <-ctx.Done():
332+
return ctx.Err()
333+
case <-bp.connectionChanged:
334+
// Connection state changed, check again
335+
}
336+
}
337+
}
338+
339+
// ForceReconnect forces a reconnection attempt immediately.
340+
// This can be used to force a reconnection if a new connection is established.
341+
func (bp *BackedPipe) ForceReconnect(ctx context.Context) error {
342+
bp.mu.Lock()
343+
defer bp.mu.Unlock()
344+
345+
if bp.closed {
346+
return io.ErrClosedPipe
347+
}
348+
349+
return bp.reconnectLocked(ctx)
350+
}

0 commit comments

Comments
 (0)