Skip to content
This repository has been archived by the owner on Nov 10, 2020. It is now read-only.

+ FIX : add the ping/ack msg to session.Pingack #40

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ func (this *service) processIncoming(msg message.Message) error {
func (this *service) processAcked(ackq *sessions.Ackqueue) {
for _, ackmsg := range ackq.Acked() {
// Let's get the messages from the saved message byte slices.

if len(ackmsg.Msgbuf) == 0 || len(ackmsg.Ackbuf) == 0 {
glog.Errorf("process/processAcked: Unable to decode new %s as ackmsg Msgbuf buf is %v or ackmsg Ackbuf buf is %v", ackmsg.Mtype, ackmsg.Msgbuf, ackmsg.Ackbuf)
Copy link

@suyashkumar suyashkumar Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi there @ljy2010a! Thanks so much for this PR, just applied it on my fork of this repo. However keeping the server alive via PINGREQs seems to throw this error a lot. From what I can tell since it's a ping this is not a problem, correct? Figured I'd ask someone like you who knows MQTT better than I :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not an issue, I'll probably just make it:

if (len(ackmsg.Msgbuf) == 0 || len(ackmsg.Ackbuf) == 0) && ackmsg.Mtype!="PINGREQ"

But so far I have not had the i/o timeout issues I was having before, so I really like this (thanks for this PR!)

Copy link
Author

@ljy2010a ljy2010a Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i/o timeout issues is case by the server setTimeout , when you have a heartbeat or send message interval less than timeout,it should not apper

the code here is just check the buffer len more than 0,that the message library
miss

you should not get the 0 buffer , so I fill the PINGRESP it in ackqueue.go like below

but if you get the 0 buffer , you should check the message that client send ,
or the server get message
, it may have a bug

by the way , i did not use this in Industrial env

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is my test example

package main

import (
    "flag"
    "log"
    "os"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/surge/glog"
    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

var addr = "tcp://127.0.0.1:12345"

func main() {

    flag.Parse()
    defer glog.Flush()

    go func() {
        srv := service.Server{}
        glog.Info("starting MQTT server at :12345")
        glog.Fatal(srv.ListenAndServe(addr))
    }()

    // wait a bit, the internal MQTT server is still starting up
    time.Sleep(time.Second)
    surgemqClient()
    // pahoClient()
}

func surgemqClient() {
    client := &service.Client{}
    msg := message.NewConnectMessage()
    msg.SetVersion(4)
    KeepAlive := 40
    msg.SetKeepAlive(uint16(KeepAlive))
    msg.SetCleanSession(true)
    // make the byte like Paho
    msg.SetClientId([]byte("Paho"))
    if err := client.Connect(addr, msg); err != nil {
        glog.Fatal(err)
    }

    go heartbeatProcess(client, KeepAlive)

    glog.Infoln("connected to port 12345")

    i := 0
    for _ = range time.Tick(time.Second * 1) {
        i++
        glog.Info(i)
    }
}

func pahoClient() {
    mqtt.ERROR = log.New(os.Stdout, "", 0)
    opts := mqtt.NewClientOptions().AddBroker(addr).SetClientID("Paho")
    opts.SetKeepAlive(2 * time.Second)
    opts.SetPingTimeout(1 * time.Second)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    i := 0
    for _ = range time.Tick(time.Second * 1) {
        i++
        glog.Info(i)
    }
}

func heartbeatProcess(c *service.Client, KeepAlive int) {
    for _ = range time.Tick(time.Duration(KeepAlive) * time.Second) {
        c.Ping(func(msg, ack message.Message, err error) error {
            glog.Infof("Ping \n")
            return nil
        })
    }
}

continue
}

msg, err := ackmsg.Mtype.New()
if err != nil {
glog.Errorf("process/processAcked: Unable to creating new %s message: %v", ackmsg.Mtype, err)
Expand Down
12 changes: 12 additions & 0 deletions sessions/ackqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (this *Ackqueue) Wait(msg message.Message, onComplete interface{}) error {
State: message.RESERVED,
OnComplete: onComplete,
}
ml := msg.Len()
this.ping.Msgbuf = make([]byte, ml)
_, err := msg.Encode(this.ping.Msgbuf)
if err != nil {
return err
}

default:
return errWaitMessage
Expand Down Expand Up @@ -165,6 +171,12 @@ func (this *Ackqueue) Ack(msg message.Message) error {
case message.PINGRESP:
if this.ping.Mtype == message.PINGREQ {
this.ping.State = message.PINGRESP
ml := msg.Len()
this.ping.Ackbuf = make([]byte, ml)
_, err := msg.Encode(this.ping.Ackbuf)
if err != nil {
return err
}
}

default:
Expand Down