Skip to content

Commit

Permalink
new
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lonergan committed Dec 6, 2018
1 parent 91f1d5b commit fa85776
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 76 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ wal_rotate_interval | int | Frequency (in mintues) at which the WAL file will be
stale_threshold | int | Threshold (in days) by which MarketStore will declare a symbol stale
enable_add | bool | Allows new symbols to be added to DB via /write API
enable_remove | bool | Allows symbols to be removed from DB via /write API
disable_variable_compression | bool | disables the default compression of variable data
triggers | slice | List of trigger plugins
bgworkers | slice | List of background worker plugins

Expand Down
5 changes: 5 additions & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func executeStart(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to parse configuration file error: %v", err.Error())
}

if utils.InstanceConfig.DisableVariableCompression {
executor.Compressed = false
}


// Spawn a goroutine and listen for a signal.
signalChan := make(chan os.Signal)
go func() {
Expand Down
27 changes: 19 additions & 8 deletions executor/readvariable.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"github.com/klauspost/compress/snappy"
"os"
"unsafe"

Expand Down Expand Up @@ -32,12 +33,13 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi
/*
Calculate how much space is needed in the results buffer
*/
/*
numIndexRecords := len(indexBuffer) / 24 // Three fields, {epoch, offset, len}, 8 bytes each
var totalDatalen int
numberLeftToRead := int(limitCount)
for i := 0; i < numIndexRecords; i++ {
datalen := int(ToInt64(indexBuffer[i*24+16:]))
numVarRecords := datalen / varRecLen
numVarRecords := datalen / varRecLen // TODO: This doesn't work with compression
if direction == FIRST {
if numVarRecords >= numberLeftToRead {
numVarRecords = numberLeftToRead
Expand All @@ -46,10 +48,12 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi
totalDatalen += numVarRecords * (varRecLen + 8)
numberLeftToRead -= numVarRecords
}

numberLeftToRead = int(limitCount)
rb = make([]byte, totalDatalen)
var rbCursor int
*/
numIndexRecords := len(indexBuffer) / 24 // Three fields, {epoch, offset, len}, 8 bytes each
numberLeftToRead := int(limitCount)
rb = make([]byte, 0)
//rb = make([]byte, totalDatalen)
//var rbCursor int
for i := 0; i < numIndexRecords; i++ {
intervalStartEpoch := ToInt64(indexBuffer[i*24:])
offset := ToInt64(indexBuffer[i*24+8:])
Expand All @@ -62,6 +66,13 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi
return nil, err
}

if Compressed {
buffer, err = snappy.Decode(nil, buffer)
if err != nil {
return nil, err
}
}

// Loop over the variable records and prepend the index time to each
numVarRecords := len(buffer) / varRecLen
if direction == FIRST {
Expand All @@ -76,9 +87,9 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi
C.rewriteBuffer(arg1, C.int(varRecLen), C.int(numVarRecords), arg4,
C.int64_t(md.Intervals), C.int64_t(intervalStartEpoch))

//rb = append(rb, rbTemp...)
copy(rb[rbCursor:], rbTemp)
rbCursor += len(rbTemp)
rb = append(rb, rbTemp...)
//copy(rb[rbCursor:], rbTemp)
//rbCursor += len(rbTemp)

numberLeftToRead -= numVarRecords
if direction == FIRST {
Expand Down
89 changes: 21 additions & 68 deletions executor/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"fmt"
"github.com/klauspost/compress/snappy"
stdio "io"
"os"
"strings"
Expand All @@ -18,6 +19,8 @@ import (
//#cgo CFLAGS: -O3 -Wno-ignored-optimization-argument
import "C"

var Compressed = true

type Writer struct {
root *catalog.Directory
tgc *TransactionPipe
Expand Down Expand Up @@ -185,8 +188,14 @@ func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer, varRecLen
if _, err := fp.Read(oldData); err != nil {
return err
}
if Compressed {
oldData, err = snappy.Decode(nil, oldData)
if err != nil {
return err
}
}
dataToBeWritten = append(oldData, dataToBeWritten...)
dataLen = currentRecInfo.Len + dataLen
dataLen = int64(len(dataToBeWritten))
}

// Determine if this is a continuation write
Expand All @@ -208,9 +217,18 @@ func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer, varRecLen
/*
Write the data at the end of the file
*/
if _, err = fp.Write(dataToBeWritten); err != nil {
return err
if Compressed {
comp := snappy.Encode(nil, dataToBeWritten)
if _, err = fp.Write(comp); err != nil {
return err
}
dataLen = int64(len(comp))
} else {
if _, err = fp.Write(dataToBeWritten); err != nil {
return err
}
}

//log.Info("LAL end_off:%d, len:%d, data:%v", endOfFileOffset, dataLen, dataToBeWritten)

/*
Expand Down Expand Up @@ -303,68 +321,3 @@ func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error) {
wal.RequestFlush()
return nil
}

/*
Legacy functions
*/
func WriteBufferToFileIndirectOverwrite(fp *os.File, buffer offsetIndexBuffer) (err error) {
/*
Here we write the data payload of the buffer to the end of the data file
*/
primaryOffset := buffer.Offset() // Offset to storage of indirect record info
index := buffer.Index()
dataToBeWritten := buffer.Payload()
dataLen := int64(len(dataToBeWritten))

/*
Write the data at the end of the file
*/
endOfFileOffset, _ := fp.Seek(0, stdio.SeekEnd)
_, err = fp.Write(dataToBeWritten)
if err != nil {
return err
}

/*
Now we write or update the index record
First we read the file at the index location to see if this is an incremental write
*/
fp.Seek(primaryOffset, stdio.SeekStart)
idBuf := make([]byte, 24) // {Index, Offset, Len}
_, err = fp.Read(idBuf)
if err != nil {
return err
}

currentRecInfo := SwapSliceByte(idBuf, IndirectRecordInfo{}).([]IndirectRecordInfo)[0]
/*
The default is a new write at the end of the file
*/
targetRecInfo := IndirectRecordInfo{Index: index, Offset: endOfFileOffset, Len: dataLen}

/*
If this is a continuation write, we adjust the targetRecInfo accordingly
*/
if currentRecInfo.Index != 0 { // If the index from the file is 0, this is a new write
cursor := currentRecInfo.Offset + currentRecInfo.Len
if endOfFileOffset == cursor {
// Incremental write
targetRecInfo.Len += currentRecInfo.Len
targetRecInfo.Offset = currentRecInfo.Offset
}
}

/*
Write the indirect record info at the primaryOffset
*/
odata := []int64{targetRecInfo.Index, targetRecInfo.Offset, targetRecInfo.Len}
obuf := SwapSliceData(odata, byte(0)).([]byte)

fp.Seek(-24, stdio.SeekCurrent)
_, err = fp.Write(obuf)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/gorilla/rpc v1.1.0
github.com/gorilla/websocket v1.4.0
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.4.0
github.com/nats-io/gnatsd v1.3.0 // indirect
github.com/nats-io/go-nats v1.6.0
github.com/nats-io/nuid v1.0.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type MktsConfig struct {
EnableAdd bool
EnableRemove bool
EnableLastKnown bool
DisableVariableCompression bool
StartTime time.Time
Triggers []*TriggerSetting
BgWorkers []*BgWorkerSetting
Expand All @@ -58,6 +59,7 @@ func (m *MktsConfig) Parse(data []byte) error {
EnableAdd string `yaml:"enable_add"`
EnableRemove string `yaml:"enable_remove"`
EnableLastKnown string `yaml:"enable_last_known"`
DisableVariableCompression string `yaml:"disable_variable_compression"`
Triggers []struct {
Module string `yaml:"module"`
On string `yaml:"on"`
Expand Down Expand Up @@ -148,6 +150,13 @@ func (m *MktsConfig) Parse(data []byte) error {

m.EnableLastKnown = false
log.Info("Disabling \"enable_last_known\" feature until it is fixed...")

if aux.DisableVariableCompression != "" {
m.DisableVariableCompression, err = strconv.ParseBool(aux.DisableVariableCompression)
if err != nil {
log.Error("Invalid value for DisableVariableCompression")
}
}
/*
// Broken - disable for now
if aux.EnableLastKnown != "" {
Expand Down

0 comments on commit fa85776

Please sign in to comment.