diff --git a/README.md b/README.md index d5a8b7896..719f7ea4e 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,7 @@ wal_rotate_interval | int | Frequency (in mintues) at which the WAL file will be stale_threshold | int | Threshold (in days) by which MarketStore will declare a symbol stale enable_add | bool | Allows new symbols to be added to DB via /write API enable_remove | bool | Allows symbols to be removed from DB via /write API +disable_variable_compression | bool | disables the default compression of variable data triggers | slice | List of trigger plugins bgworkers | slice | List of background worker plugins diff --git a/cmd/start/main.go b/cmd/start/main.go index 6178aea15..c6796271b 100644 --- a/cmd/start/main.go +++ b/cmd/start/main.go @@ -66,6 +66,11 @@ func executeStart(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to parse configuration file error: %v", err.Error()) } + if utils.InstanceConfig.DisableVariableCompression { + executor.Compressed = false + } + + // Spawn a goroutine and listen for a signal. signalChan := make(chan os.Signal) go func() { diff --git a/executor/readvariable.go b/executor/readvariable.go index aac23a49f..fe5a53b6d 100644 --- a/executor/readvariable.go +++ b/executor/readvariable.go @@ -1,6 +1,7 @@ package executor import ( + "github.com/klauspost/compress/snappy" "os" "unsafe" @@ -32,12 +33,13 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi /* Calculate how much space is needed in the results buffer */ + /* numIndexRecords := len(indexBuffer) / 24 // Three fields, {epoch, offset, len}, 8 bytes each var totalDatalen int numberLeftToRead := int(limitCount) for i := 0; i < numIndexRecords; i++ { datalen := int(ToInt64(indexBuffer[i*24+16:])) - numVarRecords := datalen / varRecLen + numVarRecords := datalen / varRecLen // TODO: This doesn't work with compression if direction == FIRST { if numVarRecords >= numberLeftToRead { numVarRecords = numberLeftToRead @@ -46,10 +48,12 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi totalDatalen += numVarRecords * (varRecLen + 8) numberLeftToRead -= numVarRecords } - - numberLeftToRead = int(limitCount) - rb = make([]byte, totalDatalen) - var rbCursor int + */ + numIndexRecords := len(indexBuffer) / 24 // Three fields, {epoch, offset, len}, 8 bytes each + numberLeftToRead := int(limitCount) + rb = make([]byte, 0) + //rb = make([]byte, totalDatalen) + //var rbCursor int for i := 0; i < numIndexRecords; i++ { intervalStartEpoch := ToInt64(indexBuffer[i*24:]) offset := ToInt64(indexBuffer[i*24+8:]) @@ -62,6 +66,13 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi return nil, err } + if Compressed { + buffer, err = snappy.Decode(nil, buffer) + if err != nil { + return nil, err + } + } + // Loop over the variable records and prepend the index time to each numVarRecords := len(buffer) / varRecLen if direction == FIRST { @@ -76,9 +87,9 @@ func (r *reader) readSecondStage(bufMeta []bufferMeta, limitCount int32, directi C.rewriteBuffer(arg1, C.int(varRecLen), C.int(numVarRecords), arg4, C.int64_t(md.Intervals), C.int64_t(intervalStartEpoch)) - //rb = append(rb, rbTemp...) - copy(rb[rbCursor:], rbTemp) - rbCursor += len(rbTemp) + rb = append(rb, rbTemp...) + //copy(rb[rbCursor:], rbTemp) + //rbCursor += len(rbTemp) numberLeftToRead -= numVarRecords if direction == FIRST { diff --git a/executor/writer.go b/executor/writer.go index e02cdcc72..36b9ea01c 100644 --- a/executor/writer.go +++ b/executor/writer.go @@ -2,6 +2,7 @@ package executor import ( "fmt" + "github.com/klauspost/compress/snappy" stdio "io" "os" "strings" @@ -18,6 +19,8 @@ import ( //#cgo CFLAGS: -O3 -Wno-ignored-optimization-argument import "C" +var Compressed = true + type Writer struct { root *catalog.Directory tgc *TransactionPipe @@ -185,8 +188,14 @@ func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer, varRecLen if _, err := fp.Read(oldData); err != nil { return err } + if Compressed { + oldData, err = snappy.Decode(nil, oldData) + if err != nil { + return err + } + } dataToBeWritten = append(oldData, dataToBeWritten...) - dataLen = currentRecInfo.Len + dataLen + dataLen = int64(len(dataToBeWritten)) } // Determine if this is a continuation write @@ -208,9 +217,18 @@ func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer, varRecLen /* Write the data at the end of the file */ - if _, err = fp.Write(dataToBeWritten); err != nil { - return err + if Compressed { + comp := snappy.Encode(nil, dataToBeWritten) + if _, err = fp.Write(comp); err != nil { + return err + } + dataLen = int64(len(comp)) + } else { + if _, err = fp.Write(dataToBeWritten); err != nil { + return err + } } + //log.Info("LAL end_off:%d, len:%d, data:%v", endOfFileOffset, dataLen, dataToBeWritten) /* @@ -303,68 +321,3 @@ func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error) { wal.RequestFlush() return nil } - -/* -Legacy functions -*/ -func WriteBufferToFileIndirectOverwrite(fp *os.File, buffer offsetIndexBuffer) (err error) { - /* - Here we write the data payload of the buffer to the end of the data file - */ - primaryOffset := buffer.Offset() // Offset to storage of indirect record info - index := buffer.Index() - dataToBeWritten := buffer.Payload() - dataLen := int64(len(dataToBeWritten)) - - /* - Write the data at the end of the file - */ - endOfFileOffset, _ := fp.Seek(0, stdio.SeekEnd) - _, err = fp.Write(dataToBeWritten) - if err != nil { - return err - } - - /* - Now we write or update the index record - First we read the file at the index location to see if this is an incremental write - */ - fp.Seek(primaryOffset, stdio.SeekStart) - idBuf := make([]byte, 24) // {Index, Offset, Len} - _, err = fp.Read(idBuf) - if err != nil { - return err - } - - currentRecInfo := SwapSliceByte(idBuf, IndirectRecordInfo{}).([]IndirectRecordInfo)[0] - /* - The default is a new write at the end of the file - */ - targetRecInfo := IndirectRecordInfo{Index: index, Offset: endOfFileOffset, Len: dataLen} - - /* - If this is a continuation write, we adjust the targetRecInfo accordingly - */ - if currentRecInfo.Index != 0 { // If the index from the file is 0, this is a new write - cursor := currentRecInfo.Offset + currentRecInfo.Len - if endOfFileOffset == cursor { - // Incremental write - targetRecInfo.Len += currentRecInfo.Len - targetRecInfo.Offset = currentRecInfo.Offset - } - } - - /* - Write the indirect record info at the primaryOffset - */ - odata := []int64{targetRecInfo.Index, targetRecInfo.Offset, targetRecInfo.Len} - obuf := SwapSliceData(odata, byte(0)).([]byte) - - fp.Seek(-24, stdio.SeekCurrent) - _, err = fp.Write(obuf) - if err != nil { - return err - } - - return nil -} diff --git a/go.mod b/go.mod index dc1cf57f0..01ace9f12 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/gorilla/rpc v1.1.0 github.com/gorilla/websocket v1.4.0 github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/klauspost/compress v1.4.0 github.com/nats-io/gnatsd v1.3.0 // indirect github.com/nats-io/go-nats v1.6.0 github.com/nats-io/nuid v1.0.0 // indirect diff --git a/utils/config.go b/utils/config.go index 300d58604..cfb710481 100644 --- a/utils/config.go +++ b/utils/config.go @@ -39,6 +39,7 @@ type MktsConfig struct { EnableAdd bool EnableRemove bool EnableLastKnown bool + DisableVariableCompression bool StartTime time.Time Triggers []*TriggerSetting BgWorkers []*BgWorkerSetting @@ -58,6 +59,7 @@ func (m *MktsConfig) Parse(data []byte) error { EnableAdd string `yaml:"enable_add"` EnableRemove string `yaml:"enable_remove"` EnableLastKnown string `yaml:"enable_last_known"` + DisableVariableCompression string `yaml:"disable_variable_compression"` Triggers []struct { Module string `yaml:"module"` On string `yaml:"on"` @@ -148,6 +150,13 @@ func (m *MktsConfig) Parse(data []byte) error { m.EnableLastKnown = false log.Info("Disabling \"enable_last_known\" feature until it is fixed...") + + if aux.DisableVariableCompression != "" { + m.DisableVariableCompression, err = strconv.ParseBool(aux.DisableVariableCompression) + if err != nil { + log.Error("Invalid value for DisableVariableCompression") + } + } /* // Broken - disable for now if aux.EnableLastKnown != "" {