Skip to content

Commit

Permalink
Merge pull request #100 from moleculer-go/develop
Browse files Browse the repository at this point in the history
TCP transporter
  • Loading branch information
pentateu authored Apr 12, 2024
2 parents 81c74ac + b4672dd commit 62e0322
Show file tree
Hide file tree
Showing 20 changed files with 1,848 additions and 120 deletions.
69 changes: 55 additions & 14 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,71 @@ func mergeConfigs(baseConfig moleculer.Config, userConfig []*moleculer.Config) m
if config.StrategyFactory != nil {
baseConfig.StrategyFactory = config.StrategyFactory
}
if config.DisableInternalMiddlewares {
baseConfig.DisableInternalMiddlewares = config.DisableInternalMiddlewares
if config.UpdateNodeMetricsFrequency != 0 {
baseConfig.UpdateNodeMetricsFrequency = config.UpdateNodeMetricsFrequency
}
if config.DisableInternalServices {
baseConfig.DisableInternalServices = config.DisableInternalServices
if config.HeartbeatFrequency != 0 {
baseConfig.HeartbeatFrequency = config.HeartbeatFrequency
}
if config.HeartbeatTimeout != 0 {
baseConfig.HeartbeatTimeout = config.HeartbeatTimeout
}
if config.OfflineCheckFrequency != 0 {
baseConfig.OfflineCheckFrequency = config.OfflineCheckFrequency
}
if config.OfflineTimeout != 0 {
baseConfig.OfflineTimeout = config.OfflineTimeout
}
if config.NeighboursCheckTimeout != 0 {
baseConfig.NeighboursCheckTimeout = config.NeighboursCheckTimeout
}
if config.WaitForDependenciesTimeout != 0 {
baseConfig.WaitForDependenciesTimeout = config.WaitForDependenciesTimeout
}
if config.Middlewares != nil {
baseConfig.Middlewares = config.Middlewares
}
if config.Namespace != "" {
baseConfig.Namespace = config.Namespace
}
if config.RequestTimeout != 0 {
baseConfig.RequestTimeout = config.RequestTimeout
}
if config.MCallTimeout != 0 {
baseConfig.MCallTimeout = config.MCallTimeout
}
if config.RetryPolicy != nil {
baseConfig.RetryPolicy = config.RetryPolicy
}
if config.MaxCallLevel != 0 {
baseConfig.MaxCallLevel = config.MaxCallLevel
}
if config.Metrics {
baseConfig.Metrics = config.Metrics
}

if config.MetricsRate > 0 {
baseConfig.MetricsRate = config.MetricsRate
}

if config.DisableInternalServices {
baseConfig.DisableInternalServices = config.DisableInternalServices
}
if config.DisableInternalMiddlewares {
baseConfig.DisableInternalMiddlewares = config.DisableInternalMiddlewares
}
if config.DontWaitForNeighbours {
baseConfig.DontWaitForNeighbours = config.DontWaitForNeighbours
}

if config.Middlewares != nil {
baseConfig.Middlewares = config.Middlewares
if config.WaitForNeighboursInterval != 0 {
baseConfig.WaitForNeighboursInterval = config.WaitForNeighboursInterval
}
if config.RequestTimeout != 0 {
baseConfig.RequestTimeout = config.RequestTimeout
if config.Created != nil {
baseConfig.Created = config.Created
}

if config.Namespace != "" {
baseConfig.Namespace = config.Namespace
if config.Started != nil {
baseConfig.Started = config.Started
}
if config.Stopped != nil {
baseConfig.Stopped = config.Stopped
}
}
}
Expand Down Expand Up @@ -330,6 +368,9 @@ func (broker *ServiceBroker) waitForService(service string) error {
break
}
if time.Since(start) > broker.config.WaitForDependenciesTimeout {
broker.logger.Debug("Time:", time.Since(start))
broker.logger.Debug("WaitForDependenciesTimeout:", broker.config.WaitForDependenciesTimeout)

err := errors.New("waitForService() - Timeout ! service: " + service)
broker.logger.Error(err)
return err
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/procfs v0.0.0-20190503130316-740c07785007 // indirect
github.com/segmentio/kafka-go v0.4.18
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/tidwall/gjson v1.9.3
github.com/tidwall/sjson v1.0.4
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/bbolt v1.3.2 // indirect
go.mongodb.org/mongo-driver v1.5.2
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect
golang.org/x/net v0.0.0-20201021035429-f5854403a974
)
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
Expand Down Expand Up @@ -167,6 +169,8 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/segmentio/kafka-go v0.4.18 h1:/LwffTZgFnfjgkUu1ZzHTwJJ39vW77wwUA0J6ftBbGw=
github.com/segmentio/kafka-go v0.4.18/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
Expand Down Expand Up @@ -201,6 +205,10 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.0.4 h1:UcdIRXff12Lpnu3OLtZvnc03g4vH2suXDXhBwBqmzYg=
github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand All @@ -213,6 +221,8 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.mongodb.org/mongo-driver v1.5.2 h1:AsxOLoJTgP6YNM0fXWw4OjdluYmWzQYp+lFJL7xu9fU=
Expand Down Expand Up @@ -254,13 +264,14 @@ golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 h1:cdsMqa2nXzqlgs183pHxtvoVwU7CyzaCTAUOg94af4c=
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
Expand Down
28 changes: 25 additions & 3 deletions moleculer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Config struct {
Transporter string
TransporterFactory TransporterFactoryFunc
StrategyFactory StrategyFactoryFunc
UpdateNodeMetricsFrequency time.Duration
HeartbeatFrequency time.Duration
HeartbeatTimeout time.Duration
OfflineCheckFrequency time.Duration
Expand All @@ -133,7 +134,7 @@ type Config struct {
Namespace string
RequestTimeout time.Duration
MCallTimeout time.Duration
RetryPolicy RetryPolicy
RetryPolicy *RetryPolicy
MaxCallLevel int
Metrics bool
MetricsRate float32
Expand All @@ -153,6 +154,7 @@ var DefaultConfig = Config{
LogFormat: "TEXT",
DiscoverNodeID: discoverNodeID,
Transporter: "MEMORY",
UpdateNodeMetricsFrequency: 5 * time.Second,
HeartbeatFrequency: 5 * time.Second,
HeartbeatTimeout: 15 * time.Second,
OfflineCheckFrequency: 20 * time.Second,
Expand All @@ -168,7 +170,7 @@ var DefaultConfig = Config{
Started: func() {},
Stopped: func() {},
MaxCallLevel: 100,
RetryPolicy: RetryPolicy{
RetryPolicy: &RetryPolicy{
Enabled: false,
},
RequestTimeout: 3 * time.Second,
Expand Down Expand Up @@ -223,14 +225,23 @@ type Node interface {
GetID() string
ExportAsMap() map[string]interface{}
IsAvailable() bool
GetIpList() []string
GetPort() int
Available()
Unavailable()
IsExpired(timeout time.Duration) bool
Update(id string, info map[string]interface{}) (bool, []map[string]interface{})

UpdateInfo(info map[string]interface{}) []map[string]interface{}
IncreaseSequence()
HeartBeat(heartbeat map[string]interface{})
Publish(service map[string]interface{})
GetUdpAddress() string
GetSequence() int64
GetCpuSequence() int64
GetCpu() int64
IsLocal() bool
UpdateMetrics()
GetHostname() string
}

type Options struct {
Expand All @@ -250,6 +261,17 @@ type Context interface {
Meta() Payload
}

type ForEachNodeFunc func(node Node) bool
type Registry interface {
GetNodeByID(nodeID string) Node
AddOfflineNode(nodeID, hostname, ipAddress string, port int) Node
ForEachNode(ForEachNodeFunc)
DisconnectNode(nodeID string)
RemoteNodeInfoReceived(message Payload)
GetLocalNode() Node
GetNodeByAddress(host string) Node
}

type BrokerContext interface {
Call(actionName string, params interface{}, opts ...Options) chan Payload
Emit(eventName string, params interface{}, groups ...string)
Expand Down
Loading

0 comments on commit 62e0322

Please sign in to comment.