Skip to content

Commit

Permalink
transaction update
Browse files Browse the repository at this point in the history
  • Loading branch information
Lidonghai committed Jan 9, 2021
1 parent ff6c60e commit 6515ea8
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 137 deletions.
116 changes: 56 additions & 60 deletions kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Kvdb struct {
mats *sync.Map //map[string]*mat
iteratorOpts *opt.ReadOptions
OnExpirse func(key, value []byte)
tran *leveldb.Transaction
}

type KvItem struct {
Expand Down Expand Up @@ -108,7 +107,7 @@ func (k *Kvdb) NilTTL(key []byte) error {
}
}

func (k *Kvdb) SetTTL(key []byte, ttl int, tran bool) error {
func (k *Kvdb) SetTTL(key []byte, ttl int, tran *leveldb.Transaction) error {
if k.enableTtl && k.Exists(key, tran) {
if ttl > 0 {
return k.ttldb.SetTTL(ttl, key)
Expand All @@ -128,19 +127,19 @@ func (k *Kvdb) GetTTL(key []byte) (float64, error) {
}
}

func (k *Kvdb) Exists(key []byte, tran bool) bool {
if tran && k.tran != nil {
ok, _ := k.tran.Has(key, k.iteratorOpts)
func (k *Kvdb) Exists(key []byte, tran *leveldb.Transaction) bool {
if tran != nil {
ok, _ := tran.Has(key, k.iteratorOpts)
return ok
} else {
ok, _ := k.db.Has(key, k.iteratorOpts)
return ok
}
}

func (k *Kvdb) Get(key []byte, tran bool) ([]byte, error) {
if tran && k.tran != nil {
data, err := k.tran.Get(key, nil)
func (k *Kvdb) Get(key []byte, tran *leveldb.Transaction) ([]byte, error) {
if tran != nil {
data, err := tran.Get(key, nil)
if err != nil {
return nil, err
}
Expand All @@ -154,7 +153,7 @@ func (k *Kvdb) Get(key []byte, tran bool) ([]byte, error) {
}
}

func (k *Kvdb) GetObject(key []byte, value interface{}, tran bool) error {
func (k *Kvdb) GetObject(key []byte, value interface{}, tran *leveldb.Transaction) error {
data, err := k.Get(key, tran)
if err != nil {
return err
Expand All @@ -170,7 +169,7 @@ func (k *Kvdb) GetObject(key []byte, value interface{}, tran bool) error {
return nil
}

func (k *Kvdb) GetObjectFirst(value interface{}, tran bool) ([]byte, error) {
func (k *Kvdb) GetObjectFirst(value interface{}, tran *leveldb.Transaction) ([]byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.First() {
Expand All @@ -187,7 +186,7 @@ func (k *Kvdb) GetObjectFirst(value interface{}, tran bool) ([]byte, error) {
return iter.Key(), nil
}

func (k *Kvdb) GetObjectLast(value interface{}, tran bool) ([]byte, error) {
func (k *Kvdb) GetObjectLast(value interface{}, tran *leveldb.Transaction) ([]byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.Last() {
Expand All @@ -204,7 +203,7 @@ func (k *Kvdb) GetObjectLast(value interface{}, tran bool) ([]byte, error) {
return iter.Key(), nil
}

func (k *Kvdb) GetJson(key []byte, value interface{}, tran bool) error {
func (k *Kvdb) GetJson(key []byte, value interface{}, tran *leveldb.Transaction) error {
data, err := k.Get(key, tran)
if err != nil {
return err
Expand All @@ -220,7 +219,7 @@ func (k *Kvdb) GetJson(key []byte, value interface{}, tran bool) error {
return nil
}

func (k *Kvdb) GetFirst(tran bool) ([]byte, []byte, error) {
func (k *Kvdb) GetFirst(tran *leveldb.Transaction) ([]byte, []byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.First() {
Expand All @@ -229,7 +228,7 @@ func (k *Kvdb) GetFirst(tran bool) ([]byte, []byte, error) {
return iter.Key(), iter.Value(), nil
}

func (k *Kvdb) GetLast(tran bool) ([]byte, []byte, error) {
func (k *Kvdb) GetLast(tran *leveldb.Transaction) ([]byte, []byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.Last() {
Expand All @@ -238,7 +237,7 @@ func (k *Kvdb) GetLast(tran bool) ([]byte, []byte, error) {
return iter.Key(), iter.Value(), nil
}

func (k *Kvdb) GetJsonFirst(value interface{}, tran bool) ([]byte, error) {
func (k *Kvdb) GetJsonFirst(value interface{}, tran *leveldb.Transaction) ([]byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.First() {
Expand All @@ -255,7 +254,7 @@ func (k *Kvdb) GetJsonFirst(value interface{}, tran bool) ([]byte, error) {
return iter.Key(), nil
}

func (k *Kvdb) GetJsonLast(value interface{}, tran bool) ([]byte, error) {
func (k *Kvdb) GetJsonLast(value interface{}, tran *leveldb.Transaction) ([]byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.Last() {
Expand All @@ -272,7 +271,7 @@ func (k *Kvdb) GetJsonLast(value interface{}, tran bool) ([]byte, error) {
return iter.Key(), nil
}

func (k *Kvdb) GetLastKey(tran bool) ([]byte, error) {
func (k *Kvdb) GetLastKey(tran *leveldb.Transaction) ([]byte, error) {
iter := k.newIter(nil, tran)
defer iter.Release()
if !iter.Last() {
Expand All @@ -281,9 +280,9 @@ func (k *Kvdb) GetLastKey(tran bool) ([]byte, error) {
return iter.Key(), nil
}

func (k *Kvdb) Put(key, value []byte, ttl int, tran bool) error {
if tran && k.tran != nil {
err := k.tran.Put(key, value, nil)
func (k *Kvdb) Put(key, value []byte, ttl int, tran *leveldb.Transaction) error {
if tran != nil {
err := tran.Put(key, value, nil)
if err != nil {
return err
}
Expand All @@ -299,30 +298,27 @@ func (k *Kvdb) Put(key, value []byte, ttl int, tran bool) error {
return nil
}

func (k *Kvdb) OpenTransaction() error {
func (k *Kvdb) OpenTransaction() (*leveldb.Transaction, error) {
tran, err := k.db.OpenTransaction()
if err != nil {
return err
return nil, err
}
k.tran = tran
return nil
return tran, nil
}

func (k *Kvdb) Commit() error {
err := k.tran.Commit()
func (k *Kvdb) Commit(tran *leveldb.Transaction) error {
err := tran.Commit()
if err != nil {
return err
}
k.tran = nil
return nil
}

func (k *Kvdb) Discard() {
k.tran.Discard()
k.tran = nil
func (k *Kvdb) Discard(tran *leveldb.Transaction) {
tran.Discard()
}

func (k *Kvdb) PutObject(key []byte, value interface{}, ttl int, tran bool) error {
func (k *Kvdb) PutObject(key []byte, value interface{}, ttl int, tran *leveldb.Transaction) error {
t := reflect.ValueOf(value)
if t.Kind() == reflect.Ptr {
t = t.Elem()
Expand All @@ -334,7 +330,7 @@ func (k *Kvdb) PutObject(key []byte, value interface{}, ttl int, tran bool) erro
return k.Put(key, msg, ttl, tran)
}

func (k *Kvdb) PutJson(key []byte, value interface{}, ttl int, tran bool) error {
func (k *Kvdb) PutJson(key []byte, value interface{}, ttl int, tran *leveldb.Transaction) error {
t := reflect.ValueOf(value)
if t.Kind() == reflect.Ptr {
t = t.Elem()
Expand All @@ -346,7 +342,7 @@ func (k *Kvdb) PutJson(key []byte, value interface{}, ttl int, tran bool) error
return k.Put(key, msg, ttl, tran)
}

func (k *Kvdb) BatPutOrDel(items *[]BatItem, tran bool) error {
func (k *Kvdb) BatPutOrDel(items *[]BatItem, tran *leveldb.Transaction) error {
batch := new(leveldb.Batch)
for _, v := range *items {
switch v.Op {
Expand All @@ -362,8 +358,8 @@ func (k *Kvdb) BatPutOrDel(items *[]BatItem, tran bool) error {
}
}
}
if tran && k.tran != nil {
err := k.tran.Write(batch, nil)
if tran != nil {
err := tran.Write(batch, nil)
if err != nil {
return err
}
Expand All @@ -376,9 +372,9 @@ func (k *Kvdb) BatPutOrDel(items *[]BatItem, tran bool) error {
return nil
}

func (k *Kvdb) Del(key []byte, tran bool) error {
if tran && k.tran != nil {
err := k.tran.Delete(key, nil)
func (k *Kvdb) Del(key []byte, tran *leveldb.Transaction) error {
if tran != nil {
err := tran.Delete(key, nil)
if err != nil {
return err
}
Expand All @@ -394,15 +390,15 @@ func (k *Kvdb) Del(key []byte, tran bool) error {
return nil
}

func (k *Kvdb) newIter(slice *util.Range, tran bool) iterator.Iterator {
if tran && k.tran != nil {
return k.tran.NewIterator(slice, k.iteratorOpts)
func (k *Kvdb) newIter(slice *util.Range, tran *leveldb.Transaction) iterator.Iterator {
if tran != nil {
return tran.NewIterator(slice, k.iteratorOpts)
} else {
return k.db.NewIterator(slice, k.iteratorOpts)
}
}

func (k *Kvdb) AllByObject(Ntype interface{}, tran bool) []KvItem {
func (k *Kvdb) AllByObject(Ntype interface{}, tran *leveldb.Transaction) []KvItem {
nt := reflect.TypeOf(Ntype)
if nt.Kind() == reflect.Ptr {
nt = nt.Elem()
Expand All @@ -424,7 +420,7 @@ func (k *Kvdb) AllByObject(Ntype interface{}, tran bool) []KvItem {
return result
}

func (k *Kvdb) AllByJson(Ntype interface{}, tran bool) []KvItem {
func (k *Kvdb) AllByJson(Ntype interface{}, tran *leveldb.Transaction) []KvItem {
nt := reflect.TypeOf(Ntype)
if nt.Kind() == reflect.Ptr {
nt = nt.Elem()
Expand All @@ -446,7 +442,7 @@ func (k *Kvdb) AllByJson(Ntype interface{}, tran bool) []KvItem {
return result
}

func (k *Kvdb) AllByKV(tran bool) []KvItem {
func (k *Kvdb) AllByKV(tran *leveldb.Transaction) []KvItem {
result := make([]KvItem, 0)
iter := k.newIter(nil, tran)
for iter.Next() {
Expand All @@ -461,7 +457,7 @@ func (k *Kvdb) AllByKV(tran bool) []KvItem {
return result
}

func (k *Kvdb) AllKeys(tran bool) []string {
func (k *Kvdb) AllKeys(tran *leveldb.Transaction) []string {
var keys []string
iter := k.newIter(nil, tran)
for iter.Next() {
Expand All @@ -471,7 +467,7 @@ func (k *Kvdb) AllKeys(tran bool) []string {
return keys
}

func (k *Kvdb) RegexpKeys(exp string, tran bool) ([]string, error) {
func (k *Kvdb) RegexpKeys(exp string, tran *leveldb.Transaction) ([]string, error) {
regx, err := regexp.Compile(exp)
if err != nil {
return nil, err
Expand All @@ -487,7 +483,7 @@ func (k *Kvdb) RegexpKeys(exp string, tran bool) ([]string, error) {
return keys, nil
}

func (k *Kvdb) RegexpByKV(exp string, tran bool) ([]KvItem, error) {
func (k *Kvdb) RegexpByKV(exp string, tran *leveldb.Transaction) ([]KvItem, error) {
regx, err := regexp.Compile(exp)
if err != nil {
return nil, err
Expand All @@ -508,7 +504,7 @@ func (k *Kvdb) RegexpByKV(exp string, tran bool) ([]KvItem, error) {
return result, nil
}

func (k *Kvdb) KeyStartDels(key []byte, tran bool) error {
func (k *Kvdb) KeyStartDels(key []byte, tran *leveldb.Transaction) error {
batch := new(leveldb.Batch)
iter := k.newIter(util.BytesPrefix(key), tran)
for iter.Next() {
Expand All @@ -522,7 +518,7 @@ func (k *Kvdb) KeyStartDels(key []byte, tran bool) error {
return nil
}

func (k *Kvdb) KeyStartKeys(key []byte, tran bool) []string {
func (k *Kvdb) KeyStartKeys(key []byte, tran *leveldb.Transaction) []string {
var keys []string
iter := k.newIter(util.BytesPrefix(key), tran)
for iter.Next() {
Expand All @@ -532,17 +528,17 @@ func (k *Kvdb) KeyStartKeys(key []byte, tran bool) []string {
return keys
}

func (k *Kvdb) Iter(tran bool) iterator.Iterator {
if tran && k.tran != nil {
return k.tran.NewIterator(nil, k.iteratorOpts)
func (k *Kvdb) Iter(tran *leveldb.Transaction) iterator.Iterator {
if tran != nil {
return tran.NewIterator(nil, k.iteratorOpts)
} else {
return k.db.NewIterator(nil, k.iteratorOpts)
}
}

func (k *Kvdb) IterStartWith(key []byte, tran bool) (iterator.Iterator, error) {
if tran && k.tran != nil {
return k.tran.NewIterator(util.BytesPrefix(key), k.iteratorOpts), nil
func (k *Kvdb) IterStartWith(key []byte, tran *leveldb.Transaction) (iterator.Iterator, error) {
if tran != nil {
return tran.NewIterator(util.BytesPrefix(key), k.iteratorOpts), nil
} else {
return k.db.NewIterator(util.BytesPrefix(key), k.iteratorOpts), nil
}
Expand All @@ -552,7 +548,7 @@ func (k *Kvdb) IterRelease(iter iterator.Iterator) {
iter.Release()
}

func (k *Kvdb) KeyStart(key []byte, tran bool) ([]KvItem, error) {
func (k *Kvdb) KeyStart(key []byte, tran *leveldb.Transaction) ([]KvItem, error) {
result := make([]KvItem, 0)
iter := k.newIter(util.BytesPrefix(key), tran)
for iter.Next() {
Expand All @@ -567,7 +563,7 @@ func (k *Kvdb) KeyStart(key []byte, tran bool) ([]KvItem, error) {
return result, nil
}

func (k *Kvdb) KeyStartByObject(key []byte, Ntype interface{}, tran bool) ([]KvItem, error) {
func (k *Kvdb) KeyStartByObject(key []byte, Ntype interface{}, tran *leveldb.Transaction) ([]KvItem, error) {
nt := reflect.TypeOf(Ntype)
if nt.Kind() == reflect.Ptr {
nt = nt.Elem()
Expand All @@ -589,7 +585,7 @@ func (k *Kvdb) KeyStartByObject(key []byte, Ntype interface{}, tran bool) ([]KvI
return result, nil
}

func (k *Kvdb) RegexpByObject(exp string, Ntype interface{}, tran bool) ([]KvItem, error) {
func (k *Kvdb) RegexpByObject(exp string, Ntype interface{}, tran *leveldb.Transaction) ([]KvItem, error) {
regx, err := regexp.Compile(exp)
if err != nil {
return nil, err
Expand Down Expand Up @@ -617,7 +613,7 @@ func (k *Kvdb) RegexpByObject(exp string, Ntype interface{}, tran bool) ([]KvIte
return result, nil
}

func (k *Kvdb) KeyRange(min, max []byte, tran bool) ([]KvItem, error) {
func (k *Kvdb) KeyRange(min, max []byte, tran *leveldb.Transaction) ([]KvItem, error) {
result := make([]KvItem, 0)
iter := k.newIter(nil, tran)
for ok := iter.Seek(min); ok && bytes.Compare(iter.Key(), max) <= 0; ok = iter.Next() {
Expand All @@ -632,7 +628,7 @@ func (k *Kvdb) KeyRange(min, max []byte, tran bool) ([]KvItem, error) {
return result, nil
}

func (k *Kvdb) KeyRangeByObject(min, max []byte, Ntype interface{}, tran bool) ([]KvItem, error) {
func (k *Kvdb) KeyRangeByObject(min, max []byte, Ntype interface{}, tran *leveldb.Transaction) ([]KvItem, error) {
nt := reflect.TypeOf(Ntype)
if nt.Kind() == reflect.Ptr {
nt = nt.Elem()
Expand Down
Loading

0 comments on commit 6515ea8

Please sign in to comment.