-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathfirehose_test.go
103 lines (94 loc) · 3.9 KB
/
firehose_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package kinetic
import (
"runtime"
"sync/atomic"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/firehose"
. "github.com/smartystreets/goconvey/convey"
)
func TestFireHose(t *testing.T) {
producer, _ := new(Firehose).InitC("your-stream", "", "", "accesskey", "secretkey", "us-east-1", 4)
producer.NewEndpoint("localhost", "your-stream")
producer.(*Firehose).client = new(fakefirehose)
producer.ReInit()
Convey("Given a running firehose producer", t, func() {
Convey("it should send data to the firehose stream", func() {
for i := 0; i < 100; i++ {
producer.Send(new(Message).Init([]byte("this is a message"), ""))
runtime.Gosched()
}
time.Sleep(10 * time.Second)
So(atomic.LoadInt64(&(producer.(*Firehose).client.(*fakefirehose).count)), ShouldEqual, 100)
So(producer.(*Firehose).getMsgCount(), ShouldEqual, 100)
})
})
}
func TestFireHoseSendSync(t *testing.T) {
producer, _ := new(Firehose).InitC("your-stream", "", "", "accesskey", "secretkey", "us-east-1", 4)
producer.NewEndpoint("localhost", "your-stream")
producer.(*Firehose).client = new(fakefirehose)
producer.ReInit()
Convey("Given a running firehose producer", t, func() {
Convey("it should send data to the firehose stream", func() {
producer.Send(new(Message).Init([]byte("this is a message"), ""))
runtime.Gosched()
time.Sleep(1 * time.Second)
So(atomic.LoadInt64(&(producer.(*Firehose).client.(*fakefirehose).count)), ShouldEqual, 1)
So(producer.(*Firehose).getMsgCount(), ShouldEqual, 1)
})
})
}
// Mocks for aws Firehose.
// This implements github.com/aws/aws-sdk-go/service/firehose/firehoseiface.FirehoseAPI
type fakefirehose struct {
count int64
}
func (f *fakefirehose) CreateDeliveryStreamRequest(*firehose.CreateDeliveryStreamInput) (*request.Request, *firehose.CreateDeliveryStreamOutput) {
return nil, nil
}
func (f *fakefirehose) CreateDeliveryStream(*firehose.CreateDeliveryStreamInput) (*firehose.CreateDeliveryStreamOutput, error) {
return nil, nil
}
func (f *fakefirehose) DeleteDeliveryStreamRequest(*firehose.DeleteDeliveryStreamInput) (*request.Request, *firehose.DeleteDeliveryStreamOutput) {
return nil, nil
}
func (f *fakefirehose) DeleteDeliveryStream(*firehose.DeleteDeliveryStreamInput) (*firehose.DeleteDeliveryStreamOutput, error) {
return nil, nil
}
func (f *fakefirehose) DescribeDeliveryStreamRequest(*firehose.DescribeDeliveryStreamInput) (*request.Request, *firehose.DescribeDeliveryStreamOutput) {
return nil, nil
}
func (f *fakefirehose) DescribeDeliveryStream(*firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error) {
return nil, nil
}
func (f *fakefirehose) ListDeliveryStreamsRequest(*firehose.ListDeliveryStreamsInput) (*request.Request, *firehose.ListDeliveryStreamsOutput) {
return nil, nil
}
func (f *fakefirehose) ListDeliveryStreams(*firehose.ListDeliveryStreamsInput) (*firehose.ListDeliveryStreamsOutput, error) {
return nil, nil
}
func (f *fakefirehose) PutRecordRequest(*firehose.PutRecordInput) (*request.Request, *firehose.PutRecordOutput) {
return nil, nil
}
func (f *fakefirehose) PutRecord(*firehose.PutRecordInput) (*firehose.PutRecordOutput, error) {
return nil, nil
}
func (f *fakefirehose) PutRecordBatchRequest(*firehose.PutRecordBatchInput) (*request.Request, *firehose.PutRecordBatchOutput) {
return nil, nil
}
func (f *fakefirehose) PutRecordBatch(*firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
atomic.AddInt64(&(f.count), 1)
return &firehose.PutRecordBatchOutput{
RequestResponses: []*firehose.PutRecordBatchResponseEntry{
&firehose.PutRecordBatchResponseEntry{},
},
}, nil
}
func (f *fakefirehose) UpdateDestinationRequest(*firehose.UpdateDestinationInput) (*request.Request, *firehose.UpdateDestinationOutput) {
return nil, nil
}
func (f *fakefirehose) UpdateDestination(*firehose.UpdateDestinationInput) (*firehose.UpdateDestinationOutput, error) {
return nil, nil
}