Skip to content

Commit

Permalink
Merge pull request #12 from huojiao2006/dev
Browse files Browse the repository at this point in the history
add NewIQueue and NewIPubSub function
  • Loading branch information
huojiao2006 authored Jul 18, 2019
2 parents 92158d9 + aa2264b commit 0aead1d
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 87 deletions.
50 changes: 43 additions & 7 deletions client/client.go → queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.

package client
package queue

import (
"encoding/json"
Expand All @@ -17,13 +17,49 @@ import (
"openpitrix.io/logger"

i "openpitrix.io/libqueue"
etcdq "openpitrix.io/libqueue/etcd"
redisq "openpitrix.io/libqueue/redis"
qetcd "openpitrix.io/libqueue/etcd"
qredis "openpitrix.io/libqueue/redis"
)

func NewIQueue(queueType string, qclient *i.IClient) (i.IQueue, error) {
var iqueue i.IQueue
switch queueType {
case "etcd":
etcdQueue := qetcd.EtcdQueue{}
iqueue = &etcdQueue
iqueue.SetClient(qclient)
return iqueue, nil
case "redis":
redisQueue := qredis.RedisQueue{}
iqueue = &redisQueue
iqueue.SetClient(qclient)
return iqueue, nil
default:
return nil, fmt.Errorf("unsupported Queue Type [%s]", queueType)
}
}

func NewIPubSub(pubsubType string, qclient *i.IClient) (i.IPubSub, error) {
var ipubsub i.IPubSub
switch pubsubType {
case "etcd":
etcdPubSub := qetcd.EtcdPubSub{}
ipubsub = &etcdPubSub
ipubsub.SetClient(qclient)
return ipubsub, nil
case "redis":
redisPubSub := qredis.RedisPubSub{}
ipubsub = &redisPubSub
ipubsub.SetClient(qclient)
return ipubsub, nil
default:
return nil, fmt.Errorf("unsupported Queue Type [%s]", pubsubType)
}
}

func NewIClient(pubsubType string, configMap map[string]interface{}) (i.IClient, error) {
if configMap == nil {
return nil, fmt.Errorf("not provide client configuration info.")
return nil, fmt.Errorf("not provide queue configuration info.")
}

switch pubsubType {
Expand All @@ -42,13 +78,13 @@ func NewIClient(pubsubType string, configMap map[string]interface{}) (i.IClient,
DialTimeout: dialTimeout,
})
if err != nil {
logger.Errorf(nil, "new etcd client failed, err=%+v", err)
logger.Errorf(nil, "new etcd queue failed, err=%+v", err)
return nil, err
}
cli.KV = namespace.NewKV(cli.KV, "")
cli.Watcher = namespace.NewWatcher(cli.Watcher, "")
cli.Lease = namespace.NewLease(cli.Lease, "")
return etcdq.EtcdClient{*cli}, err
return qetcd.EtcdClient{*cli}, err

case "redis":
cfg := loadConf4Redis(configMap)
Expand Down Expand Up @@ -77,7 +113,7 @@ func NewIClient(pubsubType string, configMap map[string]interface{}) (i.IClient,
return nil, err
}

return redisq.RedisClient{*cli}, nil
return qredis.RedisClient{*cli}, nil
default:
return nil, fmt.Errorf("unsupported Queue Type [%s]", pubsubType)
}
Expand Down
36 changes: 9 additions & 27 deletions test/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,17 @@ import (

"openpitrix.io/logger"

i "openpitrix.io/libqueue"
c "openpitrix.io/libqueue/client"
etcdps "openpitrix.io/libqueue/etcd"
redisps "openpitrix.io/libqueue/redis"
q "openpitrix.io/libqueue/queue"
)

func TestPublish4Redis(t *testing.T) {
pubsubConnStr := "redis://192.168.0.6:6379"
pubsubType := "redis"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
ipubsub, _ := q.NewIPubSub(pubsubType, &iClient)

redisPubSub := redisps.RedisPubSub{}
var ipubsub i.IPubSub
ipubsub = &redisPubSub
ipubsub.SetClient(&iClient)
ipubsub.SetChannel("channel1")
ipubsub.Publish("data1")

Expand All @@ -38,12 +32,8 @@ func TestReceiveMessage4Redis(t *testing.T) {
pubsubType := "redis"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)

redisPubSub := redisps.RedisPubSub{}
var ipubsub i.IPubSub
ipubsub = &redisPubSub
ipubsub.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
ipubsub, _ := q.NewIPubSub(pubsubType, &iClient)
ipubsub.SetChannel("channel1")

msgChan := ipubsub.ReceiveMessage()
Expand Down Expand Up @@ -74,12 +64,8 @@ func TestPublish4Etcd(t *testing.T) {
pubsubType := "etcd"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)

etcdPubSub := etcdps.EtcdPubSub{}
var ipubsub i.IPubSub
ipubsub = &etcdPubSub
ipubsub.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
ipubsub, _ := q.NewIPubSub(pubsubType, &iClient)
ipubsub.SetChannel("channel1")

ipubsub.Publish("data1")
Expand All @@ -92,12 +78,8 @@ func TestReceiveMessage4Etcd(t *testing.T) {
pubsubType := "etcd"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)

etcdPubSub := etcdps.EtcdPubSub{}
var ipubsub i.IPubSub
ipubsub = &etcdPubSub
ipubsub.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
ipubsub, _ := q.NewIPubSub(pubsubType, &iClient)
ipubsub.SetChannel("channel1")

msgChan := ipubsub.ReceiveMessage()
Expand Down
40 changes: 13 additions & 27 deletions test/queue_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ import (
"github.com/coreos/etcd/version"

i "openpitrix.io/libqueue"
c "openpitrix.io/libqueue/client"
etcdq "openpitrix.io/libqueue/etcd"
redisq "openpitrix.io/libqueue/redis"
q "openpitrix.io/libqueue/queue"
)

func enqueue(queue i.IQueue) int {
val := fmt.Sprintf("%d", rand.Intn(10000))
err := queue.Enqueue(val)
//logger.Infof(nil, "enqueue client.topic=%s", client.topic)
//logger.Infof(nil, "enqueue queue.topic=%s", queue.topic)
if err != nil {
//logger.Infof(nil, "enqueue error=%+v", err)
return 1
Expand All @@ -33,7 +31,7 @@ func enqueue(queue i.IQueue) int {

func dequeue(queue i.IQueue) int {
_, err := queue.Dequeue()
//logger.Infof(nil, "enqueue client.topic=%s", client.topic)
//logger.Infof(nil, "enqueue queue.topic=%s", queue.topic)
if err != nil {
return 1
}
Expand Down Expand Up @@ -104,11 +102,8 @@ func TestEnQueuePerf4Etcd(t *testing.T) {
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr,
}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)
etcdQueue := etcdq.EtcdQueue{}
var iqueue i.IQueue
iqueue = &etcdQueue
iqueue.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)

for i := 0; i < TestingTasksCnt; i++ {
topicName := "notification_" + strconv.Itoa(int(i))
Expand All @@ -132,11 +127,8 @@ func TestDeQueuePerf4Etcd(t *testing.T) {
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr,
}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)
etcdQueue := etcdq.EtcdQueue{}
var iqueue i.IQueue
iqueue = &etcdQueue
iqueue.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)

for i := 0; i < TestingTasksCnt; i++ {
topicName := "notification_" + strconv.Itoa(int(i))
Expand All @@ -155,12 +147,9 @@ func TestEnQueuePerf4Redis(t *testing.T) {
pubsubConnStr := "redis://192.168.0.6:6379"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient("redis", pubsubConfigMap)

redisQueue := redisq.RedisQueue{}
var iqueue i.IQueue
iqueue = &redisQueue
iqueue.SetClient(&iClient)
pubsubType := "redis"
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)

for i := 0; i < TestingTasksCnt; i++ {
topicName := "notification_" + strconv.Itoa(int(i))
Expand All @@ -180,12 +169,9 @@ func TestDeQueuePerf4Redis(t *testing.T) {
pubsubConnStr := "redis://192.168.0.6:6379"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient("redis", pubsubConfigMap)

redisQueue := redisq.RedisQueue{}
var iqueue i.IQueue
iqueue = &redisQueue
iqueue.SetClient(&iClient)
pubsubType := "redis"
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)

for i := 0; i < TestingTasksCnt; i++ {
topicName := "notification_" + strconv.Itoa(int(i))
Expand Down
36 changes: 10 additions & 26 deletions test/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,30 @@ import (

"openpitrix.io/logger"

i "openpitrix.io/libqueue"
c "openpitrix.io/libqueue/client"
etcdq "openpitrix.io/libqueue/etcd"
redisq "openpitrix.io/libqueue/redis"
q "openpitrix.io/libqueue/queue"
)

func TestEnqueue4Redis(t *testing.T) {
pubsubConnStr := "redis://192.168.0.6:6379"
pubsubType := "redis"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)

redisQueue := redisq.RedisQueue{}
var iqueue i.IQueue
iqueue = &redisQueue
iqueue.SetClient(&iClient)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)
iqueue.SetTopic("test_topic1")
iqueue.Enqueue("ssss")
}

func TestDequeue4Redis(t *testing.T) {
pubsubConnStr := "redis://192.168.0.6:6379"
pubsubType := "redis"
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr}

iClient, _ := c.NewIClient("redis", pubsubConfigMap)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)

redisQueue := redisq.RedisQueue{}
var iqueue i.IQueue
iqueue = &redisQueue
iqueue.SetClient(&iClient)
iqueue.SetTopic("test_topic1")
val, err := iqueue.Dequeue()
if err != nil {
Expand All @@ -56,12 +48,8 @@ func TestEnqueue4Etcd(t *testing.T) {
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr,
}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)

etcdQueue := etcdq.EtcdQueue{}
var iqueue i.IQueue
iqueue = &etcdQueue
iqueue.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)
iqueue.SetTopic("test_topic1")
iqueue.Enqueue("ssss")
}
Expand All @@ -72,12 +60,8 @@ func TestDequeue4Etcd(t *testing.T) {
pubsubConfigMap := map[string]interface{}{
"connStr": pubsubConnStr,
}
iClient, _ := c.NewIClient(pubsubType, pubsubConfigMap)

etcdQueue := etcdq.EtcdQueue{}
var iqueue i.IQueue
iqueue = &etcdQueue
iqueue.SetClient(&iClient)
iClient, _ := q.NewIClient(pubsubType, pubsubConfigMap)
iqueue, _ := q.NewIQueue(pubsubType, &iClient)
iqueue.SetTopic("test_topic1")

val, err := iqueue.Dequeue()
Expand Down

0 comments on commit 0aead1d

Please sign in to comment.