Skip to content

Commit 1c88cf4

Browse files
committed
add remote peer capabilities with grpc
1 parent 69ac534 commit 1c88cf4

File tree

15 files changed

+1564
-227
lines changed

15 files changed

+1564
-227
lines changed

chord/chord.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package chord
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/yousuf64/chord-kv/node"
8+
"github.com/yousuf64/chord-kv/util"
9+
"log"
10+
"math"
11+
)
12+
13+
type Core interface {
14+
node.Node
15+
16+
Join(ctx context.Context, n node.Node)
17+
Stabilize() error
18+
CheckPredecessor()
19+
FixFinger(fingerNumber int) error
20+
}
21+
22+
type Chord struct {
23+
id uint64
24+
addr string
25+
successor node.Node
26+
predecessor node.Node
27+
finger [util.M]node.Node
28+
}
29+
30+
func NewChord(addr string) *Chord {
31+
c := &Chord{
32+
id: util.Hash(addr),
33+
addr: addr,
34+
successor: nil,
35+
predecessor: nil,
36+
finger: [util.M]node.Node{},
37+
}
38+
c.successor = c
39+
40+
return c
41+
}
42+
43+
func (c *Chord) ID() uint64 {
44+
return c.id
45+
}
46+
47+
func (c *Chord) Addr() string {
48+
return c.addr
49+
}
50+
51+
func (c *Chord) FindSuccessor(ctx context.Context, id uint64) (node.Node, error) {
52+
if util.Between(id, c.id, c.successor.ID()) {
53+
return c.successor, nil
54+
}
55+
56+
closestNode := c.closestPrecedingNode(id)
57+
if closestNode.ID() == c.ID() {
58+
return c, nil
59+
}
60+
if closestNode.ID() == id {
61+
return closestNode, nil
62+
}
63+
64+
return closestNode.FindSuccessor(ctx, id)
65+
}
66+
67+
func (c *Chord) closestPrecedingNode(id uint64) node.Node {
68+
for i := util.M - 1; i >= 0; i-- {
69+
if c.finger[i] != nil && util.Between(c.finger[i].ID(), c.ID(), id) {
70+
return c.finger[i]
71+
}
72+
}
73+
74+
return c
75+
}
76+
77+
func (c *Chord) Notify(ctx context.Context, p node.Node) error {
78+
if c.predecessor == nil || util.Between(p.ID(), c.predecessor.ID(), c.ID()) {
79+
// TODO: Transfer data
80+
c.predecessor = p
81+
log.Printf("%s [%d]: (Notify) predecessor changed %d", c.Addr(), c.ID(), c.predecessor.ID())
82+
}
83+
84+
return nil
85+
}
86+
87+
func (c *Chord) GetPredecessor(ctx context.Context) (node.Node, error) {
88+
if c.predecessor == nil {
89+
return nil, errors.New("no predecessor")
90+
}
91+
92+
// TODO: was NewChord(c.predecessor.Addr(), nil)
93+
return c.predecessor, nil
94+
}
95+
96+
func (c *Chord) Join(ctx context.Context, n node.Node) {
97+
if n == nil {
98+
return
99+
}
100+
101+
c.predecessor = nil
102+
reply, err := n.FindSuccessor(ctx, c.ID())
103+
if err != nil {
104+
panic(err)
105+
}
106+
107+
//sp := NewChord(reply.Addr(), nil)
108+
//sp, err := c.newNodeFn(reply.Addr())
109+
//if err != nil {
110+
// panic(err)
111+
//}
112+
113+
c.successor = reply
114+
err = c.successor.Notify(ctx, c)
115+
if err != nil {
116+
panic(err)
117+
}
118+
}
119+
120+
func (c *Chord) Stabilize() error {
121+
x, err := c.successor.GetPredecessor(context.Background())
122+
if err != nil {
123+
if err.Error() != "no predecessor" {
124+
panic(err)
125+
}
126+
}
127+
128+
if x != nil && util.Between(x.ID(), c.ID(), c.successor.ID()) {
129+
c.successor = x
130+
log.Printf("%s [%d]: Stabilized successor %d", c.Addr(), c.ID(), c.successor.ID())
131+
//n.successor.Notify(n)
132+
}
133+
134+
if c.successor.ID() != c.ID() {
135+
log.Printf("%s [%d]: Notified successor %d", c.Addr(), c.ID(), c.successor.ID())
136+
err = c.successor.Notify(context.Background(), c)
137+
if err != nil {
138+
panic(err)
139+
}
140+
}
141+
142+
return nil
143+
}
144+
145+
func (c *Chord) CheckPredecessor() {
146+
if c.predecessor != nil {
147+
// TODO: Better to have retries with a backoff policy to avoid temporary failures/false alarms
148+
//err := c.predecessor.Healthz()
149+
//if err != nil {
150+
// log.Printf("%s [%d]: predecessor { %d: %s } not healthy", n.Addr, n.Id, n.Predecessor.Id, n.Predecessor.Addr)
151+
//}
152+
}
153+
}
154+
155+
func (c *Chord) FixFinger(fingerNumber int) error {
156+
if fingerNumber < 0 {
157+
return errors.New("cannot be less than 0")
158+
}
159+
if fingerNumber > util.M {
160+
return errors.New(fmt.Sprintf("cannot exceed %d", util.M))
161+
}
162+
163+
fingerIndex := fingerNumber - 1
164+
165+
fId := (int(c.ID()) + int(math.Pow(2, float64(fingerNumber-1)))) % int(math.Pow(2, util.M))
166+
167+
var err error
168+
c.finger[fingerIndex], err = c.FindSuccessor(context.Background(), uint64(fId))
169+
if err != nil {
170+
return err
171+
}
172+
//c.fingerIdx[fingerIndex] = uint64(fId)
173+
174+
if c.finger[fingerIndex] != nil {
175+
log.Printf("%s [%d]: Finger resolved { Index: %d, id: %d, successor: %d }", c.Addr(), c.ID(), fingerIndex, fId, c.finger[fingerIndex].ID())
176+
}
177+
178+
return err
179+
}

chord/chord_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package chord
2+
3+
import (
4+
"context"
5+
"github.com/yousuf64/chord-kv/util"
6+
"testing"
7+
"time"
8+
)
9+
10+
var testTable = []struct {
11+
id uint64
12+
successorId uint64
13+
predecessorId uint64
14+
fingerIdx [util.M]uint64
15+
fingerId [util.M]uint64
16+
}{
17+
{id: 0, successorId: 1, predecessorId: 3, fingerIdx: [util.M]uint64{1, 2, 4}, fingerId: [util.M]uint64{1, 3, 0}},
18+
{id: 1, successorId: 3, predecessorId: 0, fingerIdx: [util.M]uint64{2, 3, 5}, fingerId: [util.M]uint64{3, 3, 0}},
19+
{id: 3, successorId: 0, predecessorId: 1, fingerIdx: [util.M]uint64{4, 5, 7}, fingerId: [util.M]uint64{0, 0, 0}},
20+
}
21+
22+
func Test_JoinAllToInitNode(t *testing.T) {
23+
n0 := NewChord("node6")
24+
n0.Join(context.Background(), nil) // init lnode, n0.predecessor = nil
25+
runPeriodicJobs(n0)
26+
//startJobs(n0) // --
27+
28+
n1 := NewChord("node7")
29+
n1.Join(context.Background(), n0) // calls n0 -> n1.successor = n0
30+
runPeriodicJobs(n0, n1, n0, n1)
31+
//startJobs(n1) // checks if successor's predecessor is myself
32+
// Notify(n0 to update its predecessor)
33+
34+
n2 := NewChord("node2")
35+
n2.Join(context.Background(), n0)
36+
runPeriodicJobs(n0, n1, n2, n0, n1, n2, n0, n1, n2)
37+
//startJobs(n2)
38+
39+
//n2.ClientPut("Lord of the Rings")
40+
41+
// Lord - 2 -> lnode
42+
// of - 4
43+
// the - 2
44+
// Rings - 5
45+
46+
// Node02
47+
//// Lord
48+
////// [of, the, Rings]
49+
50+
//n2.ClientGet("Lord (remote) Rings (local)")
51+
// Lord - 2 -> lnode.Get(key: of, match: Lord Rings) -> Hit Node02
52+
53+
//time.Sleep(time.Second * 20)
54+
//c := make(chan os.Signal, 1)
55+
//signal.Notify(c, syscall.SIGTERM, syscall.SIGKILL)
56+
//<-c
57+
58+
evaluateNodes(t, n0, n1, n2)
59+
}
60+
61+
func Test_JoinDiffNodes(t *testing.T) {
62+
n0 := NewChord("node6")
63+
n0.Join(context.Background(), nil)
64+
runPeriodicJobs(n0)
65+
66+
n1 := NewChord("node7")
67+
n1.Join(context.Background(), n0)
68+
runPeriodicJobs(n0, n1, n0, n1)
69+
70+
n2 := NewChord("node2")
71+
n2.Join(context.Background(), n1)
72+
runPeriodicJobs(n0, n1, n2, n0, n1, n2)
73+
74+
evaluateNodes(t, n0, n1, n2)
75+
}
76+
77+
func Test_JoinAllNodes_DiffOrder(t *testing.T) {
78+
n0 := NewChord("node6")
79+
n0.Join(context.Background(), nil)
80+
runPeriodicJobs(n0)
81+
82+
n2 := NewChord("node2")
83+
n2.Join(context.Background(), n0)
84+
runPeriodicJobs(n0, n2, n0, n2)
85+
86+
n1 := NewChord("node7")
87+
n1.Join(context.Background(), n0)
88+
runPeriodicJobs(n0, n2, n1, n0, n2, n1)
89+
90+
evaluateNodes(t, n0, n1, n2)
91+
}
92+
93+
func Test_JoinDiffNodes_DiffOrder(t *testing.T) {
94+
n0 := NewChord("node6")
95+
n0.Join(context.Background(), nil)
96+
runPeriodicJobs(n0)
97+
98+
n2 := NewChord("node2")
99+
n2.Join(context.Background(), n0)
100+
runPeriodicJobs(n0, n2, n0, n2)
101+
102+
n1 := NewChord("node7")
103+
n1.Join(context.Background(), n2)
104+
runPeriodicJobs(n0, n2, n1, n0, n2, n1)
105+
106+
evaluateNodes(t, n0, n1, n2)
107+
}
108+
109+
func Test_JoinAllNodes_DiffOrder_2(t *testing.T) {
110+
n2 := NewChord("node2")
111+
n2.Join(context.Background(), nil)
112+
runPeriodicJobs(n2)
113+
114+
n1 := NewChord("node7")
115+
n1.Join(context.Background(), n2)
116+
runPeriodicJobs(n2, n1, n2, n1)
117+
118+
n0 := NewChord("node6")
119+
n0.Join(context.Background(), n2)
120+
runPeriodicJobs(n2, n1, n0, n2, n1, n0)
121+
122+
evaluateNodes(t, n0, n1, n2)
123+
}
124+
125+
func Test_JoinDiffNodes_DiffOrder_2(t *testing.T) {
126+
n2 := NewChord("node2")
127+
n2.Join(context.Background(), nil)
128+
runPeriodicJobs(n2)
129+
130+
n1 := NewChord("node7")
131+
n1.Join(context.Background(), n2)
132+
runPeriodicJobs(n2, n1, n2, n1)
133+
134+
n0 := NewChord("node6")
135+
n0.Join(context.Background(), n1)
136+
runPeriodicJobs(n2, n1, n0, n2, n1, n0)
137+
138+
evaluateNodes(t, n0, n1, n2)
139+
}
140+
141+
func evaluateNodes(t *testing.T, ns ...*Chord) {
142+
for i, node := range ns {
143+
testItem := testTable[i]
144+
145+
if node.ID() != testItem.id {
146+
t.Fatalf("[%d] lnode id mismatch", i)
147+
}
148+
149+
if node.successor.ID() != testItem.successorId {
150+
t.Fatalf("[%d] lnode successor mismatch", i)
151+
}
152+
153+
if node.predecessor.ID() != testItem.predecessorId {
154+
t.Fatalf("[%d] lnode predecessor mismatch", i)
155+
}
156+
157+
// TODO: ADD
158+
//if node.fingerIdx != testItem.fingerIdx {
159+
// t.Fatalf("[%d] finger idx mismatch", i)
160+
//}
161+
162+
for fi, f := range node.finger {
163+
if f.ID() != testItem.fingerId[fi] {
164+
t.Fatalf("[%d:%d] finger id mismatch", i, fi)
165+
}
166+
}
167+
}
168+
}
169+
170+
func runPeriodicJobs(ns ...*Chord) {
171+
for _, n := range ns {
172+
n.Stabilize()
173+
n.FixFinger(1)
174+
n.FixFinger(2)
175+
n.FixFinger(3)
176+
}
177+
}
178+
179+
func startJobs(chord Core) {
180+
go func() {
181+
t := time.NewTicker(time.Millisecond * 100)
182+
for range t.C {
183+
err := chord.Stabilize()
184+
if err != nil {
185+
panic(err)
186+
}
187+
}
188+
}()
189+
190+
go func() {
191+
n := 1
192+
193+
t := time.NewTicker(time.Millisecond * 150)
194+
for range t.C {
195+
if n > util.M {
196+
n = 1
197+
}
198+
199+
err := chord.FixFinger(n)
200+
if err != nil {
201+
panic(err)
202+
}
203+
204+
n++
205+
}
206+
}()
207+
208+
//chord.CheckPredecessor()
209+
}

0 commit comments

Comments
 (0)