From 24c27903ea8015bfff8f7cd8a3ad5d963d3b2cb9 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Tue, 8 Aug 2023 16:16:14 +0200 Subject: [PATCH 1/2] fix reping --- main.go | 2 +- publish/file.go | 9 +++++++++ publish/hep.go | 21 +++++++++++++++++++++ publish/publisher.go | 20 ++++++++++++++++++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 311504a..b20f320 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ import ( "github.com/sipcapture/heplify/sniffer" ) -const version = "heplify 1.65.3" +const version = "heplify 1.65.4" func createFlags() { diff --git a/publish/file.go b/publish/file.go index cc79425..4e63b61 100644 --- a/publish/file.go +++ b/publish/file.go @@ -16,6 +16,15 @@ func (fo *FileOutputer) Output(msg []byte) { } } +func (fo *FileOutputer) SendPingPacket(msg []byte) { + h, err := DecodeHEP(msg) + if err == nil { + logp.Info("%s\n", h.String()) + } else { + logp.Warn("%s", err) + } +} + func NewFileOutputer() (*FileOutputer, error) { fo := &FileOutputer{} return fo, nil diff --git a/publish/hep.go b/publish/hep.go index 7db8342..9788158 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -21,6 +21,7 @@ type HEPOutputer struct { hepQueue chan []byte addr []string client []HEPConn + msgPing []byte } func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) { @@ -57,6 +58,7 @@ func (h *HEPOutputer) ReConnect(n int) (err error) { return err } h.client[n].writer.Reset(h.client[n].conn) + h.ReSendPingPacket() return err } @@ -84,6 +86,25 @@ func (h *HEPOutputer) Output(msg []byte) { h.hepQueue <- msg } +func (h *HEPOutputer) SendPingPacket(msg []byte) { + + if h.msgPing == nil { + h.msgPing = make([]byte, len(msg)) + } + + copy(h.msgPing, msg) + + h.hepQueue <- h.msgPing +} + +func (h *HEPOutputer) ReSendPingPacket() { + + if h.msgPing != nil { + logp.Debug("collector", "send ping packet") + h.hepQueue <- h.msgPing + } +} + func (h *HEPOutputer) Send(msg []byte) { for n := range h.addr { h.client[n].writer.Write(msg) diff --git a/publish/publisher.go b/publish/publisher.go index dbe27c3..64f36d7 100644 --- a/publish/publisher.go +++ b/publish/publisher.go @@ -10,6 +10,7 @@ import ( type Outputer interface { Output(msg []byte) + SendPingPacket(msg []byte) } type Publisher struct { @@ -36,6 +37,15 @@ func (pub *Publisher) output(msg []byte) { pub.outputer.Output(msg) } +func (pub *Publisher) setHEPPing(msg []byte) { + defer func() { + if err := recover(); err != nil { + logp.Err("recover setHEPPing %v", err) + } + }() + pub.outputer.SendPingPacket(msg) +} + func (pub *Publisher) Start(pq chan *decoder.Packet) { for pkt := range pq { @@ -45,12 +55,22 @@ func (pub *Publisher) Start(pq chan *decoder.Packet) { if pkt.Version == 100 { pub.output(pkt.Payload) logp.Debug("publisher", "sent hep message from collector") + } else if pkt.Version == 0 { + //this is PING + msg, err := EncodeHEP(pkt) + if err != nil { + logp.Warn("%v", err) + continue + } + pub.setHEPPing(msg) + logp.Debug("publisher", "sent hep ping from collector") } else { msg, err := EncodeHEP(pkt) if err != nil { logp.Warn("%v", err) continue } + pub.output(msg) } } From 1ba45d5ec41ae2a867896c462704ecd60f1208a5 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Thu, 10 Aug 2023 16:31:10 +0200 Subject: [PATCH 2/2] Update main.go --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index b20f320..59b0fe3 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ import ( "github.com/sipcapture/heplify/sniffer" ) -const version = "heplify 1.65.4" +const version = "heplify 1.65.5" func createFlags() {