diff --git a/client/blast b/client/blast deleted file mode 100755 index 56e7871..0000000 Binary files a/client/blast and /dev/null differ diff --git a/client/cmd/sizes/main.go b/client/cmd/sizes/main.go index 7cb237d..589e342 100644 --- a/client/cmd/sizes/main.go +++ b/client/cmd/sizes/main.go @@ -40,7 +40,8 @@ func main() { wg.Add(f.NumWorkers) for i := 0; i < f.NumWorkers; i++ { - go func(prot common.Prot, wg *sync.WaitGroup) { + go func(prot common.Prot, wg *sync.WaitGroup, id int) { + conn, err := common.Connect("localhost", f.Port) if err != nil { panic("Couldn't connect") @@ -48,9 +49,21 @@ func main() { r := rand.New(rand.NewSource(common.RandSeed())) rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + var i int + + defer func() { + if r := recover(); r != nil { + fmt.Println("UH OH", id, i) + } + }() + + // 0 to 1MB data in 100 byte increments + for i = 0; i < 1048576; i += 100 { + + /*if i%10000 == 0 { + fmt.Println(id, i) + }*/ - // 0 to 100k data - for i := 0; i < 102400; i++ { key := common.RandData(r, f.KeyLength, false) value := common.RandData(nil, i, true) @@ -60,7 +73,7 @@ func main() { fmt.Println("Done.") wg.Done() - }(prot, wg) + }(prot, wg, i) } wg.Wait() diff --git a/client/common/utils.go b/client/common/utils.go index af10fc2..3c3e06a 100644 --- a/client/common/utils.go +++ b/client/common/utils.go @@ -25,7 +25,7 @@ import "time" // constants and configuration var letters = bytes.Repeat([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"), 10) -const predataLength = 101 * 1024 +const predataLength = 10 * 1024 * 1024 var predata []byte diff --git a/client/fill b/client/fill deleted file mode 100755 index bdd9f67..0000000 Binary files a/client/fill and /dev/null differ diff --git a/client/setget b/client/setget deleted file mode 100755 index 41bcb95..0000000 Binary files a/client/setget and /dev/null differ diff --git a/client/setops b/client/setops deleted file mode 100755 index 5890d90..0000000 Binary files a/client/setops and /dev/null differ diff --git a/client/sizes b/client/sizes deleted file mode 100755 index 57802e7..0000000 Binary files a/client/sizes and /dev/null differ diff --git a/metrics/counters.go b/metrics/counters.go index f36ff34..16b3739 100644 --- a/metrics/counters.go +++ b/metrics/counters.go @@ -16,7 +16,7 @@ package metrics import "sync/atomic" -const maxNumCounters = 1024 +const maxNumCounters = 10240 var ( cnames = make([]string, maxNumCounters) diff --git a/orcas/l1l2.go b/orcas/l1l2.go index ebf667b..4f9f0df 100644 --- a/orcas/l1l2.go +++ b/orcas/l1l2.go @@ -57,21 +57,20 @@ func (l *L1L2Orca) Set(req common.SetRequest) error { } metrics.IncCounter(MetricCmdSetSuccessL2) - // Now set in L1. If L1 fails, we log the error and fail the request. + // Now set in L1. If L1 fails, we log the error but do not fail the request. // If a user was writing a new piece of information, the error would be OK, // since the next GET would be able to put the L2 information back into L1. // In the case that the user was overwriting information, a failed set in L1 - // and successful one in L2 would leave us inconsistent. If the response was - // positive in this situation, it would look like the server successfully - // processed the request but didn't store the information. Clients will - // retry failed writes. In this case L2 will get two writes for the same key - // but this is better because it is more correct overall, though less - // efficient. Note there are no retries at this level. + // and successful one in L2 would leave us inconsistent. In this case we do + // a delete from L1 and say the request was successful. This should keep the + // data store more consistent overall at the expense of a small bit of speed. // // It should be noted that errors on a straight set are nearly always fatal // for the connection. It's likely that if this branch is taken that the // connections to everyone will be severed (for this one client connection) - // and that the client will reconnect to try again. + // and that the client will reconnect to try again. The one exception is when + // the server is so busy that it cannot clear enough memory for the data to + // be stored, in which case the delete may work just fine. metrics.IncCounter(MetricCmdSetL1) start = timer.Now() @@ -81,8 +80,30 @@ func (l *L1L2Orca) Set(req common.SetRequest) error { if err != nil { metrics.IncCounter(MetricCmdSetErrorsL1) - metrics.IncCounter(MetricCmdSetErrors) - return err + + metrics.IncCounter(MetricCmdSetL1ErrorDeleteL1) + + // in order to ensure consistency, attempt a delete from L1 + // For keys that are unable to be set in L1 but were successfully set in + // L2 this may cause a shift in load. These keys tend to be large so this + // will probably put a significant burden on L2 if the data are large. + // Note that even if there's a major problem, e.g. the connection being + // closed, this will still return success. + dcmd := common.DeleteRequest{ + Key: req.Key, + } + + start = timer.Now() + err = l.l1.Delete(dcmd) + metrics.ObserveHist(HistDeleteL1, timer.Since(start)) + + if err == common.ErrKeyNotFound { + metrics.IncCounter(MetricCmdSetL1ErrorDeleteMissesL1) + } else if err != nil { + metrics.IncCounter(MetricCmdSetL1ErrorDeleteErrorsL1) + } else { + metrics.IncCounter(MetricCmdSetL1ErrorDeleteHitsL1) + } } metrics.IncCounter(MetricCmdSetSuccessL1) @@ -615,7 +636,33 @@ func (l *L1L2Orca) Get(req common.GetRequest) error { if err != nil { metrics.IncCounter(MetricCmdGetSetErrorsL1) - return err + + // TODO: REVIEW TO END OF BLOCK + metrics.IncCounter(MetricCmdGetSetErrorL1DeleteL1) + + // in order to ensure consistency, attempt a delete from L1 + // For keys that are unable to be set in L1 but were successfully set in + // L2 this may cause a shift in load. These keys tend to be large so this + // will probably put a significant burden on L2 if the data are large. + // Note that even if there's a major problem, e.g. the connection being + // closed, this will still return success. + dcmd := common.DeleteRequest{ + Key: res.Key, + } + + start = timer.Now() + err = l.l1.Delete(dcmd) + metrics.ObserveHist(HistDeleteL1, timer.Since(start)) + + if err == common.ErrKeyNotFound { + metrics.IncCounter(MetricCmdGetSetErrorL1DeleteMissesL1) + } else if err != nil { + metrics.IncCounter(MetricCmdGetSetErrorL1DeleteErrorsL1) + } else { + metrics.IncCounter(MetricCmdGetSetErrorL1DeleteHitsL1) + } + + err = nil } metrics.IncCounter(MetricCmdGetSetSucessL1) diff --git a/orcas/l1l2_test.go b/orcas/l1l2_test.go new file mode 100644 index 0000000..aa3e055 --- /dev/null +++ b/orcas/l1l2_test.go @@ -0,0 +1,243 @@ +// Copyright 2017 Netflix, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package orcas_test + +import ( + "bufio" + "bytes" + "io" + "testing" + + "github.com/netflix/rend/common" + "github.com/netflix/rend/orcas" + "github.com/netflix/rend/protocol/textprot" +) + +func TestL1L2Orca(t *testing.T) { + t.Run("Set", func(t *testing.T) { + t.Run("L2SetSuccess", func(t *testing.T) { + t.Run("L1SetError", func(t *testing.T) { + t.Run("L1DeleteHit", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, nil}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteMiss", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, common.ErrKeyNotFound}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteMiss", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, io.EOF}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + }) + }) + }) + t.Run("Get", func(t *testing.T) { + t.Run("L1Miss", func(t *testing.T) { + t.Run("L2Hit", func(t *testing.T) { + t.Run("L1SetFailure", func(t *testing.T) { + t.Run("L1DeleteHit", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, nil}, + responses: []common.GetResponse{ + common.GetResponse{ + Miss: true, + }, + }, + } + h2 := &testHandler{ + eresponses: []common.GetEResponse{ + common.GetEResponse{ + Key: []byte("key"), + Data: []byte("foo"), + }, + }, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Get(common.GetRequest{ + Keys: [][]byte{[]byte("key")}, + Opaques: []uint32{0}, + Quiet: []bool{false}, + NoopEnd: false, + }) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + gold := "VALUE key 0 3\r\nfoo\r\nEND\r\n" + + if out != gold { + t.Fatalf("Expected response '%v' but got '%v'", gold, out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteMiss", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, common.ErrKeyNotFound}, + responses: []common.GetResponse{ + common.GetResponse{ + Miss: true, + }, + }, + } + h2 := &testHandler{ + eresponses: []common.GetEResponse{ + common.GetEResponse{ + Key: []byte("key"), + Data: []byte("foo"), + }, + }, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Get(common.GetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + gold := "VALUE key 0 3\r\nfoo\r\nEND\r\n" + + if out != gold { + t.Fatalf("Expected response '%v' but got '%v'", gold, out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteError", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, io.EOF}, + responses: []common.GetResponse{ + common.GetResponse{ + Miss: true, + }, + }, + } + h2 := &testHandler{ + eresponses: []common.GetEResponse{ + common.GetEResponse{ + Key: []byte("key"), + Data: []byte("foo"), + }, + }, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Get(common.GetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + gold := "VALUE key 0 3\r\nfoo\r\nEND\r\n" + + if out != gold { + t.Fatalf("Expected response '%v' but got '%v'", gold, out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + }) + }) + }) + }) +} diff --git a/orcas/l1l2batch.go b/orcas/l1l2batch.go index 3996e85..b386b70 100644 --- a/orcas/l1l2batch.go +++ b/orcas/l1l2batch.go @@ -72,8 +72,30 @@ func (l *L1L2BatchOrca) Set(req common.SetRequest) error { metrics.IncCounter(MetricCmdSetReplaceNotStoredL1) } else { metrics.IncCounter(MetricCmdSetReplaceErrorsL1) - metrics.IncCounter(MetricCmdSetErrors) - return err + + metrics.IncCounter(MetricsCmdSetReplaceL1ErrorDeleteL1) + + // in order to ensure consistency, attempt a last-ditch delete from L1 + // For keys that are unable to be set in L1 but were successfully set in + // L2 this may cause a shift in load. These keys tend to be large so this + // will probably put a significant burden on L2 if the keys are expensive. + // Note that even if there's a major problem, e.g. the connection being + // closed, this will still return success. + dcmd := common.DeleteRequest{ + Key: req.Key, + } + + start = timer.Now() + err = l.l1.Delete(dcmd) + metrics.ObserveHist(HistDeleteL1, timer.Since(start)) + + if err == common.ErrKeyNotFound { + metrics.IncCounter(MetricsCmdSetReplaceL1ErrorDeleteMissesL1) + } else if err != nil { + metrics.IncCounter(MetricsCmdSetReplaceL1ErrorDeleteErrorsL1) + } else { + metrics.IncCounter(MetricsCmdSetReplaceL1ErrorDeleteHitsL1) + } } } else { metrics.IncCounter(MetricCmdSetReplaceStoredL1) diff --git a/orcas/l1l2batch_test.go b/orcas/l1l2batch_test.go new file mode 100644 index 0000000..09e0e49 --- /dev/null +++ b/orcas/l1l2batch_test.go @@ -0,0 +1,116 @@ +// Copyright 2017 Netflix, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package orcas_test + +import ( + "bufio" + "bytes" + "io" + "testing" + + "github.com/netflix/rend/common" + "github.com/netflix/rend/orcas" + "github.com/netflix/rend/protocol/textprot" +) + +func TestL1L2BatchOrca(t *testing.T) { + t.Run("Set", func(t *testing.T) { + t.Run("L2SetSuccess", func(t *testing.T) { + t.Run("L1SetError", func(t *testing.T) { + t.Run("L1DeleteHit", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, nil}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2Batch(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteMiss", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, common.ErrKeyNotFound}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2Batch(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + t.Run("L1DeleteMiss", func(t *testing.T) { + h1 := &testHandler{ + errors: []error{common.ErrNoMem, io.EOF}, + } + h2 := &testHandler{ + errors: []error{nil}, + } + output := &bytes.Buffer{} + + l1l2 := orcas.L1L2Batch(h1, h2, textprot.NewTextResponder(bufio.NewWriter(output))) + + err := l1l2.Set(common.SetRequest{}) + if err != nil { + t.Fatalf("Error should be nil, got %v", err) + } + + out := string(output.Bytes()) + + t.Logf(out) + + if out != "STORED\r\n" { + t.Fatalf("Expected response 'STORED\\r\\n' but got '%v'", out) + } + + h1.verifyEmpty(t) + h2.verifyEmpty(t) + }) + }) + }) + }) +} diff --git a/orcas/types.go b/orcas/types.go index bdc5d28..d07e88b 100644 --- a/orcas/types.go +++ b/orcas/types.go @@ -57,11 +57,15 @@ var ( MetricCmdGetKeysL1 = metrics.AddCounter("cmd_get_keys_l1", nil) MetricCmdGetKeysL2 = metrics.AddCounter("cmd_get_keys_l2", nil) - // Batch L1L2 get metrics MetricCmdGetSetL1 = metrics.AddCounter("cmd_get_set_l1", nil) MetricCmdGetSetErrorsL1 = metrics.AddCounter("cmd_get_set_errors_l1", nil) MetricCmdGetSetSucessL1 = metrics.AddCounter("cmd_get_set_success_l1", nil) + MetricCmdGetSetErrorL1DeleteL1 = metrics.AddCounter("cmd_get_set_l1_error_delete_l1", nil) + MetricCmdGetSetErrorL1DeleteHitsL1 = metrics.AddCounter("cmd_get_set_l1_error_delete_hits_l1", nil) + MetricCmdGetSetErrorL1DeleteMissesL1 = metrics.AddCounter("cmd_get_set_l1_error_delete_misses_l1", nil) + MetricCmdGetSetErrorL1DeleteErrorsL1 = metrics.AddCounter("cmd_get_set_l1_error_delete_errors_l1", nil) + MetricCmdGetEL1 = metrics.AddCounter("cmd_gete_l1", nil) MetricCmdGetEL2 = metrics.AddCounter("cmd_gete_l2", nil) MetricCmdGetEHits = metrics.AddCounter("cmd_gete_hits", nil) @@ -86,12 +90,24 @@ var ( MetricCmdSetErrorsL1 = metrics.AddCounter("cmd_set_errors_l1", nil) MetricCmdSetErrorsL2 = metrics.AddCounter("cmd_set_errors_l2", nil) + // L1L2 delete after failed L1 set metrics + MetricCmdSetL1ErrorDeleteL1 = metrics.AddCounter("cmd_set_l1_error_delete_l1", nil) + MetricCmdSetL1ErrorDeleteHitsL1 = metrics.AddCounter("cmd_set_l1_error_delete_hits_l1", nil) + MetricCmdSetL1ErrorDeleteMissesL1 = metrics.AddCounter("cmd_set_l1_error_delete_misses_l1", nil) + MetricCmdSetL1ErrorDeleteErrorsL1 = metrics.AddCounter("cmd_set_l1_error_delete_errors_l1", nil) + // Batch L1L2 set metrics MetricCmdSetReplaceL1 = metrics.AddCounter("cmd_set_replace_l1", nil) MetricCmdSetReplaceNotStoredL1 = metrics.AddCounter("cmd_set_replace_not_stored_l1", nil) MetricCmdSetReplaceErrorsL1 = metrics.AddCounter("cmd_set_replace_errors_l1", nil) MetricCmdSetReplaceStoredL1 = metrics.AddCounter("cmd_set_replace_stored_l1", nil) + // Batch L1L2 delete after failed set metrics + MetricsCmdSetReplaceL1ErrorDeleteL1 = metrics.AddCounter("cmd_set_replace_l1_error_delete_l1", nil) + MetricsCmdSetReplaceL1ErrorDeleteHitsL1 = metrics.AddCounter("cmd_set_replace_l1_error_delete_hits_l1", nil) + MetricsCmdSetReplaceL1ErrorDeleteMissesL1 = metrics.AddCounter("cmd_set_replace_l1_error_delete_misses_l1", nil) + MetricsCmdSetReplaceL1ErrorDeleteErrorsL1 = metrics.AddCounter("cmd_set_replace_l1_error_delete_errors_l1", nil) + MetricCmdAddL1 = metrics.AddCounter("cmd_add_l1", nil) MetricCmdAddL2 = metrics.AddCounter("cmd_add_l2", nil) MetricCmdAddStored = metrics.AddCounter("cmd_add_stored", nil) diff --git a/orcas/util_test.go b/orcas/util_test.go new file mode 100644 index 0000000..067a6dd --- /dev/null +++ b/orcas/util_test.go @@ -0,0 +1,135 @@ +// Copyright 2017 Netflix, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package orcas_test + +import ( + "testing" + + "github.com/netflix/rend/common" +) + +type testHandler struct { + errors []error + responses []common.GetResponse + eresponses []common.GetEResponse +} + +func (h *testHandler) verifyEmpty(t *testing.T) { + if len(h.errors) > 0 { + t.Fatalf("Expected errors to be empty. Left over: %#v", h.errors) + } + if len(h.responses) > 0 { + t.Fatalf("Expected errors to be empty. Left over: %#v", h.responses) + } + if len(h.eresponses) > 0 { + t.Fatalf("Expected errors to be empty. Left over: %#v", h.eresponses) + } +} + +func (h *testHandler) Set(cmd common.SetRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Add(cmd common.SetRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Replace(cmd common.SetRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Append(cmd common.SetRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Prepend(cmd common.SetRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Get(cmd common.GetRequest) (<-chan common.GetResponse, <-chan error) { + if len(h.responses) > 0 { + res := h.responses[0] + h.responses = h.responses[1:] + reschan := make(chan common.GetResponse, 1) + reschan <- res + close(reschan) + + errchan := make(chan error) + close(errchan) + + return reschan, errchan + } + + reschan := make(chan common.GetResponse) + close(reschan) + + err := h.errors[0] + h.errors = h.errors[1:] + errchan := make(chan error, 1) + errchan <- err + close(errchan) + + return reschan, errchan +} +func (h *testHandler) GetE(cmd common.GetRequest) (<-chan common.GetEResponse, <-chan error) { + if len(h.eresponses) > 0 { + res := h.eresponses[0] + h.eresponses = h.eresponses[1:] + reschan := make(chan common.GetEResponse, 1) + reschan <- res + close(reschan) + + errchan := make(chan error) + close(errchan) + + return reschan, errchan + } + + reschan := make(chan common.GetEResponse) + close(reschan) + + err := h.errors[0] + h.errors = h.errors[1:] + errchan := make(chan error, 1) + errchan <- err + close(errchan) + + return reschan, errchan +} +func (h *testHandler) GAT(cmd common.GATRequest) (common.GetResponse, error) { + ret := h.errors[0] + h.errors = h.errors[1:] + return common.GetResponse{}, ret +} +func (h *testHandler) Delete(cmd common.DeleteRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Touch(cmd common.TouchRequest) error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +} +func (h *testHandler) Close() error { + ret := h.errors[0] + h.errors = h.errors[1:] + return ret +}