Skip to content

Commit e5be506

Browse files
committed
"chore: add backed reader, writer and pipe"
1 parent 2851d9f commit e5be506

File tree

10 files changed

+2968
-4
lines changed

10 files changed

+2968
-4
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
package backedpipe
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
8+
"golang.org/x/xerrors"
9+
)
10+
11+
const (
12+
// DefaultBufferSize is the default buffer size for the BackedWriter (64MB)
13+
DefaultBufferSize = 64 * 1024 * 1024
14+
)
15+
16+
// ReconnectFunc is called when the BackedPipe needs to establish a new connection.
17+
// It should:
18+
// 1. Establish a new connection to the remote side
19+
// 2. Exchange sequence numbers with the remote side
20+
// 3. Return the new connection and the remote's current sequence number
21+
//
22+
// The writerSeqNum parameter is the local writer's current sequence number,
23+
// which should be sent to the remote side so it knows where to resume reading from.
24+
//
25+
// The returned readerSeqNum should be the remote side's current sequence number,
26+
// which indicates where the local reader should resume from.
27+
type ReconnectFunc func(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error)
28+
29+
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
30+
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
31+
// and data replay capabilities.
32+
type BackedPipe struct {
33+
ctx context.Context
34+
cancel context.CancelFunc
35+
mu sync.RWMutex
36+
reader *BackedReader
37+
writer *BackedWriter
38+
reconnectFn ReconnectFunc
39+
conn io.ReadWriteCloser
40+
connected bool
41+
closed bool
42+
43+
// Reconnection state
44+
reconnecting bool
45+
46+
// Error channel for receiving connection errors from reader/writer
47+
errorChan chan error
48+
49+
// Connection state notification
50+
connectionChanged chan struct{}
51+
}
52+
53+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
54+
// The pipe starts disconnected and must be connected using Connect().
55+
func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
56+
pipeCtx, cancel := context.WithCancel(ctx)
57+
58+
bp := &BackedPipe{
59+
ctx: pipeCtx,
60+
cancel: cancel,
61+
reader: NewBackedReader(),
62+
writer: NewBackedWriterWithCapacity(DefaultBufferSize), // 64MB default buffer
63+
reconnectFn: reconnectFn,
64+
errorChan: make(chan error, 2), // Buffer for reader and writer errors
65+
connectionChanged: make(chan struct{}, 1),
66+
}
67+
68+
// Set up error callbacks
69+
bp.reader.SetErrorCallback(func(err error) {
70+
select {
71+
case bp.errorChan <- err:
72+
case <-bp.ctx.Done():
73+
}
74+
})
75+
76+
bp.writer.SetErrorCallback(func(err error) {
77+
select {
78+
case bp.errorChan <- err:
79+
case <-bp.ctx.Done():
80+
}
81+
})
82+
83+
// Start error handler goroutine
84+
go bp.handleErrors()
85+
86+
return bp
87+
}
88+
89+
// Connect establishes the initial connection using the reconnect function.
90+
func (bp *BackedPipe) Connect(ctx context.Context) error {
91+
bp.mu.Lock()
92+
defer bp.mu.Unlock()
93+
94+
if bp.closed {
95+
return xerrors.New("pipe is closed")
96+
}
97+
98+
if bp.connected {
99+
return xerrors.New("pipe is already connected")
100+
}
101+
102+
return bp.reconnectLocked(ctx)
103+
}
104+
105+
// Read implements io.Reader by delegating to the BackedReader.
106+
func (bp *BackedPipe) Read(p []byte) (int, error) {
107+
bp.mu.RLock()
108+
reader := bp.reader
109+
closed := bp.closed
110+
bp.mu.RUnlock()
111+
112+
if closed {
113+
return 0, io.ErrClosedPipe
114+
}
115+
116+
return reader.Read(p)
117+
}
118+
119+
// Write implements io.Writer by delegating to the BackedWriter.
120+
func (bp *BackedPipe) Write(p []byte) (int, error) {
121+
bp.mu.RLock()
122+
writer := bp.writer
123+
closed := bp.closed
124+
bp.mu.RUnlock()
125+
126+
if closed {
127+
return 0, io.ErrClosedPipe
128+
}
129+
130+
return writer.Write(p)
131+
}
132+
133+
// Close closes the pipe and all underlying connections.
134+
func (bp *BackedPipe) Close() error {
135+
bp.mu.Lock()
136+
defer bp.mu.Unlock()
137+
138+
if bp.closed {
139+
return nil
140+
}
141+
142+
bp.closed = true
143+
bp.cancel() // Cancel main context
144+
145+
// Close underlying components
146+
var readerErr, writerErr, connErr error
147+
148+
if bp.reader != nil {
149+
readerErr = bp.reader.Close()
150+
}
151+
152+
if bp.writer != nil {
153+
writerErr = bp.writer.Close()
154+
}
155+
156+
if bp.conn != nil {
157+
connErr = bp.conn.Close()
158+
bp.conn = nil
159+
}
160+
161+
bp.connected = false
162+
bp.signalConnectionChange()
163+
164+
// Return first error encountered
165+
if readerErr != nil {
166+
return readerErr
167+
}
168+
if writerErr != nil {
169+
return writerErr
170+
}
171+
return connErr
172+
}
173+
174+
// Connected returns whether the pipe is currently connected.
175+
func (bp *BackedPipe) Connected() bool {
176+
bp.mu.RLock()
177+
defer bp.mu.RUnlock()
178+
return bp.connected
179+
}
180+
181+
// signalConnectionChange signals that the connection state has changed.
182+
func (bp *BackedPipe) signalConnectionChange() {
183+
select {
184+
case bp.connectionChanged <- struct{}{}:
185+
default:
186+
// Channel is full, which is fine - we just want to signal that something changed
187+
}
188+
}
189+
190+
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
191+
func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
192+
if bp.reconnecting {
193+
return xerrors.New("reconnection already in progress")
194+
}
195+
196+
bp.reconnecting = true
197+
defer func() {
198+
bp.reconnecting = false
199+
}()
200+
201+
// Close existing connection if any
202+
if bp.conn != nil {
203+
_ = bp.conn.Close()
204+
bp.conn = nil
205+
}
206+
207+
bp.connected = false
208+
bp.signalConnectionChange()
209+
210+
// Get current writer sequence number to send to remote
211+
writerSeqNum := bp.writer.SequenceNum()
212+
213+
// Unlock during reconnect attempt to avoid blocking reads/writes
214+
bp.mu.Unlock()
215+
conn, readerSeqNum, err := bp.reconnectFn(ctx, writerSeqNum)
216+
bp.mu.Lock()
217+
218+
if err != nil {
219+
return xerrors.Errorf("reconnect failed: %w", err)
220+
}
221+
222+
// Validate sequence numbers
223+
if readerSeqNum > writerSeqNum {
224+
_ = conn.Close()
225+
return xerrors.Errorf("remote sequence number %d exceeds local sequence %d, cannot replay",
226+
readerSeqNum, writerSeqNum)
227+
}
228+
229+
// Validate writer can replay from the requested sequence
230+
if !bp.writer.CanReplayFrom(readerSeqNum) {
231+
_ = conn.Close()
232+
// Calculate data loss
233+
currentSeq := bp.writer.SequenceNum()
234+
dataLoss := currentSeq - DefaultBufferSize - readerSeqNum
235+
return xerrors.Errorf("cannot replay from sequence %d (current: %d, data loss: ~%d bytes)",
236+
readerSeqNum, currentSeq, dataLoss)
237+
}
238+
239+
// Reconnect reader and writer
240+
seqNum := make(chan uint64, 1)
241+
newR := make(chan io.Reader, 1)
242+
243+
go bp.reader.Reconnect(seqNum, newR)
244+
245+
// Get sequence number and send new reader
246+
<-seqNum
247+
newR <- conn
248+
249+
err = bp.writer.Reconnect(readerSeqNum, conn)
250+
if err != nil {
251+
_ = conn.Close()
252+
return xerrors.Errorf("reconnect writer: %w", err)
253+
}
254+
255+
// Success - update state
256+
bp.conn = conn
257+
bp.connected = true
258+
bp.signalConnectionChange()
259+
260+
return nil
261+
}
262+
263+
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
264+
func (bp *BackedPipe) handleErrors() {
265+
for {
266+
select {
267+
case <-bp.ctx.Done():
268+
return
269+
case err := <-bp.errorChan:
270+
// Connection error occurred
271+
bp.mu.Lock()
272+
273+
// Skip if already closed or not connected
274+
if bp.closed || !bp.connected {
275+
bp.mu.Unlock()
276+
continue
277+
}
278+
279+
// Mark as disconnected
280+
bp.connected = false
281+
bp.signalConnectionChange()
282+
283+
// Try to reconnect
284+
reconnectErr := bp.reconnectLocked(bp.ctx)
285+
bp.mu.Unlock()
286+
287+
if reconnectErr != nil {
288+
// Reconnection failed - log or handle as needed
289+
// For now, we'll just continue and wait for manual reconnection
290+
_ = err // Use the original error
291+
}
292+
}
293+
}
294+
}
295+
296+
// WaitForConnection blocks until the pipe is connected or the context is canceled.
297+
func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
298+
for {
299+
bp.mu.RLock()
300+
connected := bp.connected
301+
closed := bp.closed
302+
bp.mu.RUnlock()
303+
304+
if closed {
305+
return io.ErrClosedPipe
306+
}
307+
308+
if connected {
309+
return nil
310+
}
311+
312+
select {
313+
case <-ctx.Done():
314+
return ctx.Err()
315+
case <-bp.connectionChanged:
316+
// Connection state changed, check again
317+
}
318+
}
319+
}
320+
321+
// ForceReconnect forces a reconnection attempt immediately.
322+
// This can be used to force a reconnection if a new connection is established.
323+
func (bp *BackedPipe) ForceReconnect(ctx context.Context) error {
324+
bp.mu.Lock()
325+
defer bp.mu.Unlock()
326+
327+
if bp.closed {
328+
return io.ErrClosedPipe
329+
}
330+
331+
return bp.reconnectLocked(ctx)
332+
}

0 commit comments

Comments
 (0)