Skip to content

Commit

Permalink
Sub2の処理を削除。Subの内容に変更した。
Browse files Browse the repository at this point in the history
  • Loading branch information
acro-takanori committed May 19, 2015
1 parent e42a1d0 commit b32dea4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 57 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This can benchmark the throughput for publishing and subscribing.

Supported benchmark pattern is:
* Parallel publish from clients
* Parallel subscribe from clients
* Parallel subscribe from clients with publishing

## Getting started
### Installation
Expand Down Expand Up @@ -36,10 +36,10 @@ Result : broker=tcp://192.168.1.100:1883, clients=10, totalCount=1000, duration=
### Subscribe
* Precondition
* The MQTT Broker is started.
* The MQTT broker will keep the messages. It will be published the message with retained.
* Publishing to MQTT Broker.
```
(Keep the messages before subscribing)
$ mqtt-bench -broker=tcp://192.168.1.100:1883 -action=pub -retain=true
(Publish the messages while subscribing)
$ mqtt-bench -broker=tcp://192.168.1.100:1883 -action=pub -count=10000
$ mqtt-bench -broker=tcp://192.168.1.100:1883 -action=sub
2015-04-04 12:50:27.188396 +0900 JST Start benchmark
Expand All @@ -48,10 +48,15 @@ $ mqtt-bench -broker=tcp://192.168.1.100:1883 -action=sub
Result : broker=tcp://192.168.1.100:1883, clients=10, totalCount=1000, duration=287ms, throughput=3484.32messages/sec
```

If the following message is output to the console, the count is over limit.
So, please set ```-intervalTime``` option.
```
panic: Subscribe error : Not finished in the max count. It may not be received the message.
```
## Usage
```
Usage of mqtt-bench
-action="p|pub|publish or s|sub|subscribe" : Publish or Subscribe (required)
-action="p|pub or s|sub" : Publish or Subscribe (required)
-broker="tcp://{host}:{port}" : URI of MQTT broker (required)
-broker-password="" : Password for connecting to the MQTT broker
-broker-username="" : Username for connecting to the MQTT broker
Expand Down
56 changes: 4 additions & 52 deletions mqtt-bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var DefaultHandlerResults []*SubscribeResult
// 実行オプション
type ExecOptions struct {
Broker string // Broker URI
Qos byte // QoS(0/1/2)
Qos byte // QoS(0|1|2)
Retain bool // Retain
Topic string // Topicのルート
Username string // ユーザID
Expand Down Expand Up @@ -129,54 +129,10 @@ func Publish(client *MQTT.Client, topic string, qos byte, retain bool, message s
}
}

// 全クライアントに対して、subscribeの処理を行う。
// 指定されたカウント数分、メッセージ取得を行う(メッセージが取得できない場合もカウントする)。
func SubscribeAllClient(clients []*MQTT.Client, opts ExecOptions, param ...string) int {
wg := new(sync.WaitGroup)

totalCount := 0
totalRecieve := 0
for id := 0; id < len(clients); id++ {
wg.Add(1)

client := clients[id]

go func(clientId int) {
defer wg.Done()

for index := 0; index < opts.Count; index++ {
topic := fmt.Sprintf(opts.Topic+"/%d", clientId)

if Debug {
fmt.Printf("Subscribe : id=%d, count=%d, topic=%s\n", clientId, index, topic)
}

result := Subscribe(client, topic, opts.Qos)
// 実行数をカウント
totalCount++
// メッセージ数をカウント
totalRecieve += result.Count

if opts.IntervalTime > 0 {
time.Sleep(time.Duration(opts.IntervalTime) * time.Millisecond)
}
}
}(id)
}

wg.Wait()

if totalRecieve == 0 {
fmt.Printf("Subscribe warning : recieved no message.\n")
}

return totalCount
}

// 全クライアントに対して、subscribeの処理を行う。
// 指定されたカウント数分、メッセージを受信待ちする(メッセージが取得できない場合はカウントされない)。
// この処理では、Publishし続けながら、Subscribeの処理を行う。
func SubscribeAllClient2(clients []*MQTT.Client, opts ExecOptions, param ...string) int {
func SubscribeAllClient(clients []*MQTT.Client, opts ExecOptions, param ...string) int {
wg := new(sync.WaitGroup)

results := make([]*SubscribeResult, len(clients))
Expand Down Expand Up @@ -339,7 +295,7 @@ func Disconnect(client *MQTT.Client) {

func main() {
broker := flag.String("broker", "tcp://{host}:{port}", "URI of MQTT broker (required)")
action := flag.String("action", "p|pub or s|sub or s2|sub2", "Publish or Subscribe or Subscribe(with publishing) (required)")
action := flag.String("action", "p|pub or s|sub", "Publish or Subscribe or Subscribe(with publishing) (required)")
qos := flag.Int("qos", 0, "MQTT QoS(0|1|2)")
retain := flag.Bool("retain", false, "MQTT Retain")
topic := flag.String("topic", BASE_TOPIC, "Base topic")
Expand Down Expand Up @@ -372,11 +328,9 @@ func main() {
method = "pub"
} else if *action == "s" || *action == "sub" {
method = "sub"
} else if *action == "s2" || *action == "sub2" {
method = "sub2"
}

if method != "pub" && method != "sub" && method != "sub2" {
if method != "pub" && method != "sub" {
fmt.Printf("Invalid argument : -action -> %s\n", *action)
return
}
Expand All @@ -402,7 +356,5 @@ func main() {
Execute(PublishAllClient, execOpts)
case "sub":
Execute(SubscribeAllClient, execOpts)
case "sub2":
Execute(SubscribeAllClient2, execOpts)
}
}

0 comments on commit b32dea4

Please sign in to comment.