-
Notifications
You must be signed in to change notification settings - Fork 3
/
processor_slab.go
165 lines (141 loc) · 4.8 KB
/
processor_slab.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package main
import (
"fmt"
"log"
"net"
"time"
"github.com/bradfitz/gomemcache/memcache"
)
// SlabProcessor migrates data from a single slab, by reading a chunk of keys and copying them
// over one by one. If the `Move` flag is set, successfully migrated entries are removed from
// the source node
type SlabProcessor struct {
Name string
Slab int64
BatchSize int64
MemcacheSrcAddress string
MemcacheDestAddresses []string
Move bool
initialised bool
stats Stats
}
// NewSlabProcessor returns a new processor which can own the operations from a specific memcached node/slab
func NewSlabProcessor(srcMemcacheAddr string, destMemcacheAddr []string, slab int64, batchSize int64, move bool) *SlabProcessor {
p := &SlabProcessor{
Name: fmt.Sprintf("Slab Processor on host %s - slab %d (batch size: %d)", srcMemcacheAddr, slab, batchSize),
Slab: slab,
BatchSize: batchSize,
MemcacheSrcAddress: srcMemcacheAddr,
MemcacheDestAddresses: destMemcacheAddr,
Move: move,
}
p.stats = Stats{
ProcessorName: p.Name,
}
return p
}
// RunOnce reads the max amount of keys from the given slab
// (limited to https://github.com/memcached/memcached/issues/153)
// and migrates the corresponding items to the destination cluster.
// When the `Move` flag is set, the successfully migrated items are removed from the source server,
// making it possible to request new batches in a loop until all the data is migrated.
func (p *SlabProcessor) RunOnce() {
if !p.initialised {
p.stats.StartTime = time.Now()
}
chKeys := make(chan string, 100)
chCopiedKeys := make(chan string, 100)
resReaderCh := make(chan Stats)
resWriterCh := make(chan Stats)
resEraserCh := make(chan Stats)
// read keys directly from the source memcache instance
conn, err := net.Dial("tcp", p.MemcacheSrcAddress)
if err != nil {
log.Fatal(err)
}
chItems := make(chan *memcache.Item, 100)
go p.runReader([]string{p.MemcacheSrcAddress}, chKeys, chItems, resReaderCh)
go p.runWriter(p.MemcacheDestAddresses, chItems, chCopiedKeys, resWriterCh)
if p.Move {
go p.runEraser([]string{p.MemcacheSrcAddress}, chCopiedKeys, resEraserCh)
} else {
go p.noOp([]string{p.MemcacheSrcAddress}, chCopiedKeys, resEraserCh)
}
readKeysFromSlab(conn, p.Slab, p.BatchSize, chKeys)
close(chKeys)
stats2 := <-resReaderCh
p.stats.Processed += stats2.Processed
p.stats.GetErrors += stats2.GetErrors
stats2 = <-resWriterCh
p.stats.SetErrors += stats2.SetErrors
stats2 = <-resEraserCh
p.stats.DelErrors += stats2.DelErrors
//fmt.Println("END RunOnce()")
p.stats.EndTime = time.Now()
}
// Run will implement similar logic to ServerProcessor.Run(), i.e. will iterate through
// pages of keys and migrate all the data until the source slab is empty
func (p *SlabProcessor) Run() {
}
// GetStats returns stats collected doing a run
func (p *SlabProcessor) GetStats() Stats {
return p.stats
}
// runWriter writes the items read by runReader
func (p *SlabProcessor) runWriter(memcacheAddresses []string, ch <-chan *memcache.Item, chCopiedKeys chan<- string, resCh chan<- Stats) {
stats := Stats{}
client := memcache.New(memcacheAddresses...)
for item := range ch {
stats.Processed++
err := storeItem(client, item)
if nil != err {
log.Printf("Error storing '%s' / '%s': %s\n", item.Key, item.Value, err.Error())
stats.SetErrors++
continue
}
chCopiedKeys <- item.Key
}
resCh <- stats
close(chCopiedKeys)
}
// runReader reads the keys read from a slab, and passes the retrieved items to runWriter
func (p *SlabProcessor) runReader(memcacheAddresses []string, chKeys <-chan string, chItems chan<- *memcache.Item, resCh chan<- Stats) {
stats := Stats{}
client := memcache.New(memcacheAddresses...)
for key := range chKeys {
stats.Processed++
item, err := readItem(client, key)
if nil != err {
log.Printf("Error reading '%s': %s\n", key, err.Error())
stats.GetErrors++
continue
}
chItems <- item
}
resCh <- stats
close(chItems)
}
// runEraser optionally deletes the migrated entries from the source node
func (p *SlabProcessor) runEraser(memcacheAddresses []string, chCopiedKeys <-chan string, resCh chan<- Stats) {
stats := Stats{}
client := memcache.New(memcacheAddresses...)
for key := range chCopiedKeys {
stats.Processed++
err := deleteItem(client, key)
if nil != err {
log.Printf("Error deleting '%s': %s\n", key, err.Error())
stats.DelErrors++
continue
}
}
resCh <- stats
}
// noOp empties the input channel of migrated entries. To be used in alternative to runEraser when the `Move` flag is false
func (p *SlabProcessor) noOp(memcacheAddresses []string, chCopiedKeys <-chan string, resCh chan<- Stats) {
stats := Stats{}
for _ = range chCopiedKeys {
// do nothing, just empty the channel
stats.Processed++
}
resCh <- stats
}