Skip to content

Commit

Permalink
Merge pull request #256 from sipcapture/reconnect
Browse files Browse the repository at this point in the history
fix reping
  • Loading branch information
adubovikov authored Aug 10, 2023
2 parents 763e5ae + 1ba45d5 commit 851dedc
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/sipcapture/heplify/sniffer"
)

const version = "heplify 1.65.3"
const version = "heplify 1.65.5"

func createFlags() {

Expand Down
9 changes: 9 additions & 0 deletions publish/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ func (fo *FileOutputer) Output(msg []byte) {
}
}

func (fo *FileOutputer) SendPingPacket(msg []byte) {
h, err := DecodeHEP(msg)
if err == nil {
logp.Info("%s\n", h.String())
} else {
logp.Warn("%s", err)
}
}

func NewFileOutputer() (*FileOutputer, error) {
fo := &FileOutputer{}
return fo, nil
Expand Down
21 changes: 21 additions & 0 deletions publish/hep.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type HEPOutputer struct {
hepQueue chan []byte
addr []string
client []HEPConn
msgPing []byte
}

func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {
Expand Down Expand Up @@ -57,6 +58,7 @@ func (h *HEPOutputer) ReConnect(n int) (err error) {
return err
}
h.client[n].writer.Reset(h.client[n].conn)
h.ReSendPingPacket()
return err
}

Expand Down Expand Up @@ -84,6 +86,25 @@ func (h *HEPOutputer) Output(msg []byte) {
h.hepQueue <- msg
}

func (h *HEPOutputer) SendPingPacket(msg []byte) {

if h.msgPing == nil {
h.msgPing = make([]byte, len(msg))
}

copy(h.msgPing, msg)

h.hepQueue <- h.msgPing
}

func (h *HEPOutputer) ReSendPingPacket() {

if h.msgPing != nil {
logp.Debug("collector", "send ping packet")
h.hepQueue <- h.msgPing
}
}

func (h *HEPOutputer) Send(msg []byte) {
for n := range h.addr {
h.client[n].writer.Write(msg)
Expand Down
20 changes: 20 additions & 0 deletions publish/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type Outputer interface {
Output(msg []byte)
SendPingPacket(msg []byte)
}

type Publisher struct {
Expand All @@ -36,6 +37,15 @@ func (pub *Publisher) output(msg []byte) {
pub.outputer.Output(msg)
}

func (pub *Publisher) setHEPPing(msg []byte) {
defer func() {
if err := recover(); err != nil {
logp.Err("recover setHEPPing %v", err)
}
}()
pub.outputer.SendPingPacket(msg)
}

func (pub *Publisher) Start(pq chan *decoder.Packet) {
for pkt := range pq {

Expand All @@ -45,12 +55,22 @@ func (pub *Publisher) Start(pq chan *decoder.Packet) {
if pkt.Version == 100 {
pub.output(pkt.Payload)
logp.Debug("publisher", "sent hep message from collector")
} else if pkt.Version == 0 {
//this is PING
msg, err := EncodeHEP(pkt)
if err != nil {
logp.Warn("%v", err)
continue
}
pub.setHEPPing(msg)
logp.Debug("publisher", "sent hep ping from collector")
} else {
msg, err := EncodeHEP(pkt)
if err != nil {
logp.Warn("%v", err)
continue
}

pub.output(msg)
}
}
Expand Down

0 comments on commit 851dedc

Please sign in to comment.