Skip to content

Commit 22f8bfc

Browse files
committed
implement querying, add dump endpoint for debugging
1 parent 060b8cc commit 22f8bfc

File tree

8 files changed

+345
-17
lines changed

8 files changed

+345
-17
lines changed

chord/bucketmap/bucketmap.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package bucketmap
22

33
import (
4+
"encoding/json"
45
"errors"
56
"fmt"
67
"github.com/yousuf64/chord-kv/node"
@@ -59,3 +60,72 @@ func (b *BucketMap) Add(bucketId uint64, insertItem node.InsertItem) error {
5960

6061
return nil
6162
}
63+
64+
func (b *BucketMap) Query(id uint64, index string, query string) (string, bool) {
65+
value, ok := b.buckets.Load(id)
66+
if !ok {
67+
return "", false
68+
}
69+
70+
split := strings.Split(query, " ")
71+
72+
bkt := value.(*bucket)
73+
OuterLoop:
74+
for _, it := range bkt.items {
75+
if it.Index == index {
76+
z := 0
77+
Loop:
78+
for _, s := range split {
79+
for _, sidx := range it.SecIdx[z:] {
80+
z++
81+
if s == sidx {
82+
continue Loop
83+
}
84+
}
85+
continue OuterLoop
86+
}
87+
88+
// Should have matched
89+
return it.Value, true
90+
}
91+
}
92+
93+
return "", false
94+
}
95+
96+
func (b *BucketMap) Dump() string {
97+
var dump []struct {
98+
Id uint64
99+
Items []item
100+
UniqueIndexes []string
101+
}
102+
103+
b.buckets.Range(func(key, value any) bool {
104+
i := struct {
105+
Id uint64
106+
Items []item
107+
UniqueIndexes []string
108+
}{
109+
Id: key.(uint64),
110+
Items: value.(*bucket).items,
111+
UniqueIndexes: nil,
112+
}
113+
114+
uq := make([]string, 0)
115+
value.(*bucket).uniqueIndexes.Range(func(key, value any) bool {
116+
uq = append(uq, key.(string))
117+
return true
118+
})
119+
i.UniqueIndexes = uq
120+
121+
dump = append(dump, i)
122+
return true
123+
})
124+
125+
v, err := json.MarshalIndent(dump, "", "\t")
126+
if err != nil {
127+
panic(err)
128+
}
129+
130+
return string(v)
131+
}

chord/chord.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ type ChordNode interface {
1818
Stabilize() error
1919
CheckPredecessor()
2020
FixFinger(fingerNumber int) error
21+
22+
// DEBUG
23+
Dump() string
2124
}
2225

2326
type Chord struct {
@@ -109,6 +112,29 @@ func (c *Chord) InsertBatch(ctx context.Context, items ...node.InsertItem) error
109112
return nil
110113
}
111114

115+
func (c *Chord) Query(ctx context.Context, index string, query string) (string, error) {
116+
id := util.Hash(index)
117+
if util.Between(id, c.predecessor.ID(), c.ID()) {
118+
value, ok := c.kv.Query(id, index, query)
119+
if !ok {
120+
return "", errors.New("not found")
121+
}
122+
123+
return value, nil
124+
} else {
125+
successor, err := c.FindSuccessor(ctx, id)
126+
if err != nil {
127+
return "", err
128+
}
129+
130+
value, err := successor.Query(ctx, index, query)
131+
if err != nil {
132+
return "", err
133+
}
134+
return value, nil
135+
}
136+
}
137+
112138
func (c *Chord) insertLocal(ctx context.Context, items []node.InsertItem) {
113139
for _, item := range items {
114140
itemHash := util.Hash(item.Index)
@@ -252,3 +278,7 @@ func (c *Chord) FixFinger(fingerNumber int) error {
252278

253279
return err
254280
}
281+
282+
func (c *Chord) Dump() string {
283+
return c.kv.Dump()
284+
}

kv/kv.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import (
99

1010
type KV interface {
1111
Insert(ctx context.Context, key string, value string) error
12+
Get(ctx context.Context, query string) (string, error)
13+
14+
// DEBUG
15+
Dump() string
1216
}
1317

1418
type DistributedKV struct {
@@ -41,3 +45,19 @@ func (d *DistributedKV) Insert(ctx context.Context, key string, value string) er
4145
}
4246
return nil
4347
}
48+
49+
func (d *DistributedKV) Get(ctx context.Context, query string) (string, error) {
50+
query = strings.ToLower(query)
51+
index := strings.SplitN(query, " ", 2)[0]
52+
// TODO: Prioritize looking into local node first
53+
value, err := d.c.Query(ctx, index, query)
54+
if err != nil {
55+
return "", err
56+
}
57+
58+
return value, nil
59+
}
60+
61+
func (d *DistributedKV) Dump() string {
62+
return d.c.Dump()
63+
}

node/node.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Node interface {
1818
GetPredecessor(ctx context.Context) (Node, error)
1919

2020
InsertBatch(ctx context.Context, items ...InsertItem) error
21+
Query(ctx context.Context, index string, query string) (string, error)
2122
}
2223

2324
type InsertItem struct {
@@ -65,6 +66,20 @@ func (r *RemoteNode) InsertBatch(ctx context.Context, items ...InsertItem) error
6566
return nil
6667
}
6768

69+
func (r *RemoteNode) Query(ctx context.Context, index string, query string) (string, error) {
70+
req := &transport.QueryRequest{
71+
Index: index,
72+
Query: query,
73+
}
74+
75+
reply, err := r.client.Query(ctx, req)
76+
if err != nil {
77+
return "", err
78+
}
79+
80+
return reply.Value, nil
81+
}
82+
6883
func (r *RemoteNode) ID() uint64 {
6984
return r.id
7085
}

node/server/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,12 @@ func (s *Server) Insert(ctx context.Context, request *transport.InsertRequest) (
5959
}
6060
return &emptypb.Empty{}, nil
6161
}
62+
63+
func (s *Server) Query(ctx context.Context, request *transport.QueryRequest) (*transport.QueryReply, error) {
64+
reply, err := s.chord.Query(ctx, request.GetIndex(), request.GetQuery())
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return &transport.QueryReply{Value: reply}, nil
70+
}

0 commit comments

Comments
 (0)