-
Notifications
You must be signed in to change notification settings - Fork 236
/
Copy pathwalreplay.go
299 lines (268 loc) · 8.84 KB
/
walreplay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package executor
import (
"errors"
"fmt"
goio "io"
"path/filepath"
"sort"
"github.com/alpacahq/marketstore/v4/executor/wal"
"github.com/alpacahq/marketstore/v4/utils/io"
"github.com/alpacahq/marketstore/v4/utils/log"
)
// Replay loads this WAL File's unwritten transactions to primary store and mark it completely processed.
// We will do this in two passes, in the first pass we will collect the Transaction Group IDs that are
// not yet durably written to the primary store. In the second pass, we write the data into the
// Primary Store directly and then flush the results.
// Finally we close the WAL File and mark it completely written.
//
// 1) First WAL Pass: Locate unwritten TGIDs
// 2) Second WAL Pass: Load the open TG data into the Primary Data files
// 3) Flush the TG Cache to primary and mark this WAL File completely processed
//
// Note that the TG Data for any given TGID should appear in the WAL only once. We verify it in the first pass.
func (wf *WALFileType) Replay(dryRun bool) error {
// Make sure this file needs replay
needsReplay, err := wf.NeedsReplay()
if err != nil {
return fmt.Errorf("check if walfile needs to be replayed: %w", err)
}
if !needsReplay {
log.Info("No WAL Replay needed.")
return wal.ReplayError{
Msg: "WALFileType.NeedsReplay No Replay Needed",
Cont: true,
}
}
// Take control of this file and set the status
if !dryRun {
if err2 := wf.WriteStatus(wal.OPEN, wal.REPLAYINPROCESS); err2 != nil {
return fmt.Errorf("failed to write REPLAY_IN_PROCESS status to wal: %w", err2)
}
}
// First pass of WAL Replay: determine transaction states and record locations of TG data
txnStateWAL := make(map[int64]TxnStatusEnum)
txnStatePrimary := make(map[int64]TxnStatusEnum)
offsetTGDataInWAL := make(map[int64]int64)
log.Info("Beginning WAL Replay")
if dryRun {
log.Info("Debugging mode enabled - no writes will be performed...")
}
// Create a map to store the TG Data prior to replay
tgData := make(map[int64][]byte)
_, err = wf.FilePtr.Seek(0, goio.SeekStart)
if err != nil {
return fmt.Errorf("seek wal from start for replay:%w", err)
}
continueRead := true
for continueRead {
msgID, err := wf.readMessageID()
if continueRead = fullRead(err); !continueRead {
break // Break out of read loop
}
switch msgID {
case TGDATA:
// Read a TGData
offset, err := wf.FilePtr.Seek(0, goio.SeekCurrent)
if err != nil {
return fmt.Errorf("seek error: %w", err)
}
tgID, tgSerialized, err := wf.readTGData()
tgData[tgID] = tgSerialized
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
// give up Replay if there is already a TG data location in this WAL
if _, ok := offsetTGDataInWAL[tgID]; ok {
log.Error(io.GetCallerFileContext(0) + ": Duplicate TG Data in WAL")
return wal.ReplayError{
Msg: fmt.Sprintf("Duplicate TG Data in WAL. tgID=%d", tgID),
Cont: true,
}
}
// log.Info("Successfully read past TG data for tgID: %v", tgID)
// Save the offset of this TG Data for the second pass
offsetTGDataInWAL[tgID] = offset
case TXNINFO:
// Read a TXNInfo
TGID, destination, txnStatus, err := wf.readTransactionInfo()
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
switch destination {
case WAL:
txnStateWAL[TGID] = txnStatus
case CHECKPOINT:
if _, ok := tgData[TGID]; ok && txnStatus == COMMITCOMPLETE {
// Remove all TGData for tgID less than this complete one
for tgid := range tgData {
if tgid <= TGID {
tgData[tgid] = nil
delete(tgData, tgid)
}
}
} else {
// Record this txnStatus for later analysis
txnStatePrimary[TGID] = txnStatus
}
}
case STATUS:
// Read the status - note that this message should only be at the file beginning
_, _, _, err := wal.ReadStatus(wf.FilePtr)
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
default:
log.Warn("Unknown meessage id %d", msgID)
}
}
// Second Pass of WAL Replay: Find any pending transactions based on the state and load the TG data into cache
log.Info("Entering replay of TGData")
// We need to replay TGs in descending TGID order
// StringSlice attaches the methods of Interface to []string, sorting in increasing order.
var sortedTGIDs TGIDlist
for tgid := range tgData {
sortedTGIDs = append(sortedTGIDs, tgid)
}
sort.Sort(sortedTGIDs)
// for tgid, TG_Serialized := range tgData {
for _, tgid := range sortedTGIDs {
tgSerialized := tgData[tgid]
if tgSerialized == nil {
continue
}
if dryRun {
continue
}
// Note that only TG data that did not have a COMMITCOMPLETE record are replayed
rootDir := filepath.Dir(wf.FilePtr.Name())
tgID, wtSets := ParseTGData(tgSerialized, rootDir)
if err := wf.replayTGData(tgID, wtSets); err != nil {
return fmt.Errorf("replay transaction group data. tgID=%d, "+
"write transaction size=%d:%w", tgID, len(wtSets), err)
}
}
log.Info("Replay of WAL file %s finished", wf.FilePtr.Name())
if !dryRun {
if err := wf.WriteStatus(wal.OPEN, wal.REPLAYED); err != nil {
return fmt.Errorf("failed to write REPLAYED status to wal: %w", err)
}
}
log.Info("Finished replay of TGData")
return nil
}
func (wf *WALFileType) replayTGData(tgID int64, wtSets []wal.WTSet) (err error) {
if len(wtSets) == 0 {
return nil
}
cfp := NewCachedFP() // Cached open file pointer
defer func() {
err2 := cfp.Close()
if err2 != nil {
log.Error(fmt.Sprintf("failed to close cached file pointer %s: %v", cfp, err2))
}
}()
for _, wtSet := range wtSets {
fp, err2 := cfp.GetFP(wtSet.FilePath)
if err2 != nil {
return wal.ReplayError{
Msg: fmt.Sprintf("failed to open a filepath %s in write transaction set:%v",
wtSet.FilePath, err2.Error(),
),
Cont: true,
}
}
switch wtSet.RecordType {
case io.FIXED:
if err3 := WriteBufferToFile(fp, wtSet.Buffer); err3 != nil {
return err3
}
case io.VARIABLE:
// Find the record length - we need it to use the time column as a sort key later
if err = WriteBufferToFileIndirect(fp,
wtSet.Buffer,
wtSet.VarRecLen,
); err != nil {
return err
}
default:
return fmt.Errorf("record Type is incorrect from WALFile, may be invalid/outdated WAL file")
}
}
wf.lastCommittedTGID = tgID
err = wf.CreateCheckpoint()
if err != nil {
return fmt.Errorf("create checkpoint of wal:%w", err)
}
return nil
}
// fullRead checks an error to see if we have read only partial data.
func fullRead(err error) bool {
if err == nil {
return true
}
if errors.Is(err, goio.EOF) {
log.Debug("fullRead: read until the end of WAL file")
return false
}
var targetErr wal.ShortReadError
if ok := errors.As(err, &targetErr); ok {
log.Info(fmt.Sprintf("Partial Read. err=%v", err))
return false
}
log.Error(io.GetCallerFileContext(0) + ": Uncorrectable IO error in WAL Replay")
return true
}
// readMessageID reads 1 byte from the current offset of WALfile and return it as a MessageID.
// If it's at the end of wal file, readMessageID returns 0, io.EOF error.
// If it's not at the end of wal file but couldn't read 1 byte, readMessageID returns 0, wal.ShortReadError.
func (wf *WALFileType) readMessageID() (mid MIDEnum, err error) {
const unknownMessageID = 99
var buffer [1]byte
buf, _, err := wal.Read(wf.FilePtr, buffer[:])
if err != nil {
if errors.Is(err, goio.EOF) {
return 0, goio.EOF
}
return 0, wal.ShortReadError("WALFileType.ReadMessageID. err:" + err.Error())
}
MID := MIDEnum(buf[0])
switch MID {
case TGDATA, TXNINFO, STATUS:
return MID, nil
}
return unknownMessageID, fmt.Errorf("WALFileType.ReadMessageID Incorrect MID read, value: %d:%w", MID, err)
}
const (
// see /docs/durable_writes_design.txt for definition.
tgLenBytes = 8
tgIDBytes = 8
checkSumBytes = 16
)
func (wf *WALFileType) readTGData() (tgID int64, tgSerialized []byte, err error) {
tgLenSerialized := make([]byte, tgLenBytes)
tgLenSerialized, _, err = wal.Read(wf.FilePtr, tgLenSerialized)
if err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0))
}
tgLen := io.ToInt64(tgLenSerialized)
if !sanityCheckValue(wf.FilePtr, tgLen) {
return 0, nil, errors.New(io.GetCallerFileContext(0) + fmt.Sprintf(": Insane TG Length: %d", tgLen))
}
// Read the data
tgSerialized = make([]byte, tgLen)
n, err := wf.FilePtr.Read(tgSerialized)
if int64(n) != tgLen || err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0) + ":Reading Data")
}
tgID = io.ToInt64(tgSerialized[:tgIDBytes-1])
// Read the checksum
checkBuf := make([]byte, checkSumBytes)
n, err = wf.FilePtr.Read(checkBuf)
if n != checkSumBytes || err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0) + ":Reading Checksum")
}
if err := validateCheckSum(tgLenSerialized, tgSerialized, checkBuf); err != nil {
return 0, nil, err
}
return tgID, tgSerialized, nil
}