Skip to content

Commit

Permalink
feat: support connectivity check callback (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
mzz2017 authored Jan 11, 2024
1 parent fa1b7f6 commit 95a77e5
Show file tree
Hide file tree
Showing 15 changed files with 389 additions and 71 deletions.
11 changes: 10 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/daeuniverse/dae-wing/graphql"
"github.com/daeuniverse/dae-wing/graphql/service/config"
"github.com/daeuniverse/dae-wing/webrender"
"github.com/daeuniverse/dae-wing/ws"
"github.com/golang-jwt/jwt/v5"
"github.com/graph-gophers/graphql-go/relay"
"github.com/rs/cors"
Expand Down Expand Up @@ -118,6 +119,7 @@ var (
}
mux := http.NewServeMux()
mux.Handle("/graphql", auth(cors.AllowAll().Handler(&relay.Handler{Schema: schema})))
mux.Handle("/ws", auth(cors.AllowAll().Handler(&ws.Handler{})))
if err = webrender.Handle(mux); err != nil {
errorExit(err)
}
Expand Down Expand Up @@ -209,7 +211,11 @@ func shouldReload() (ok bool, err error) {

func auth(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authorization := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
authorization := r.Header.Get("Authorization")
if authorization == "" {
authorization = r.URL.Query().Get("authorization")
}
authorization = strings.TrimPrefix(authorization, "Bearer ")
var user db.User
token, err := jwt.Parse(authorization, func(token *jwt.Token) (interface{}, error) {
// Don't forget to validate the alg is what you expect:
Expand Down Expand Up @@ -238,6 +244,9 @@ func auth(next http.Handler) http.Handler {
ctx = context.WithValue(ctx, "user", &user)
}
}
w.Header().Set("x-auth-result", "1")
} else {
w.Header().Set("x-auth-result", "0")
}
next.ServeHTTP(w, r.WithContext(ctx))
})
Expand Down
2 changes: 1 addition & 1 deletion dae-core
Submodule dae-core updated 154 files
66 changes: 66 additions & 0 deletions dae/daemsg_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package dae

import (
"container/list"
"context"

"github.com/daeuniverse/dae/control"
)

type DaeMsgProducer struct {
chMsg <-chan *control.Msg
// resetChMsg is used to re-assign chMsg.
resetChMsg chan struct{}
daeSubscriber chan *daeSubscriber
}

func NewDaeMsgProducer(chMsg <-chan *control.Msg) *DaeMsgProducer {
r := &DaeMsgProducer{
chMsg: chMsg,
resetChMsg: make(chan struct{}),
daeSubscriber: make(chan *daeSubscriber),
}
return r
}

func (r *DaeMsgProducer) ReassignChMsg(chMsg <-chan *control.Msg) {
r.chMsg = chMsg
r.resetChMsg <- struct{}{}
}

func (r *DaeMsgProducer) Run() {
subs := list.New().Init()
for {
select {
// New subscriber.
case sub := <-r.daeSubscriber:
subs.PushBack(sub)
// Reset pointer to r.chMsg.
case <-r.resetChMsg:

// Producer msg.
case msg := <-r.chMsg:
for node := subs.Front(); node != nil; node = node.Next() {
sub := node.Value.(*daeSubscriber)
select {
case <-sub.stop:
subs.Remove(node)
case sub.msgs <- msg:
default:
// Busy.
}
}
}
}
}

type daeSubscriber struct {
msgs chan<- *control.Msg
stop <-chan struct{}
}

func (r *DaeMsgProducer) Subscribe(ctx context.Context) <-chan *control.Msg {
c := make(chan *control.Msg, 10)
r.daeSubscriber <- &daeSubscriber{msgs: c, stop: ctx.Done()}
return c
}
46 changes: 29 additions & 17 deletions dae/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var GracefullyExit = make(chan struct{})
var EmptyConfig *daeConfig.Config
var c *control.ControlPlane
var onceWaitingNetwork sync.Once
var ChMsg chan *control.Msg
var MsgProducer *DaeMsgProducer

func init() {
sections, err := config_parser.Parse(`global{} routing{}`)
Expand All @@ -43,10 +45,11 @@ func init() {
}

func ControlPlane() (*control.ControlPlane, error) {
if c == nil {
ctl := c
if ctl == nil {
return nil, ErrControlPlaneNotInit
}
return c, nil
return ctl, nil
}

func Run(log *logrus.Logger, conf *daeConfig.Config, externGeoDataDirs []string, disableTimestamp bool, dry bool) (err error) {
Expand All @@ -67,7 +70,10 @@ func Run(log *logrus.Logger, conf *daeConfig.Config, externGeoDataDirs []string,
}

// New c.
c, err = newControlPlane(log, nil, nil, conf, externGeoDataDirs)
ChMsg = make(chan *control.Msg, 10)
MsgProducer = NewDaeMsgProducer(ChMsg)
go MsgProducer.Run()
c, err = newControlPlane(log, nil, nil, conf, externGeoDataDirs, ChMsg)
if err != nil {
return err
}
Expand Down Expand Up @@ -144,8 +150,10 @@ loop:
// Only keep dns cache when ip version preference not change.
dnsCache = c.CloneDnsCache()
}
// New ChMsg.
newChMsg := make(chan *control.Msg, 10)
log.Warnln("[Reload] Load new control plane")
newC, err := newControlPlane(log, obj, dnsCache, newConf, externGeoDataDirs)
newC, err := newControlPlane(log, obj, dnsCache, newConf, externGeoDataDirs, newChMsg)
if err != nil {
/* dae-wing start */
errReload = err
Expand All @@ -155,7 +163,7 @@ loop:
"err": err,
}).Errorln("[Reload] Failed to reload; try to roll back configuration")
// Load last config back.
newC, err = newControlPlane(log, obj, dnsCache, conf, externGeoDataDirs)
newC, err = newControlPlane(log, obj, dnsCache, conf, externGeoDataDirs, ChMsg)
if err != nil {
obj.Close()
c.Close()
Expand All @@ -164,6 +172,7 @@ loop:
}).Fatalln("[Reload] Failed to roll back configuration")
}
newConf = conf
newChMsg = ChMsg
log.Errorln("[Reload] Last reload failed; rolled back configuration")
} else {
log.Warnln("[Reload] Stopped old control plane")
Expand All @@ -180,6 +189,8 @@ loop:
oldC := c
c = newC
conf = newConf
ChMsg = newChMsg
MsgProducer.ReassignChMsg(newChMsg)
reloading = true
/* dae-wing start */
chCallback = newReloadMsg.Callback
Expand All @@ -195,7 +206,7 @@ loop:
return nil
}

func newControlPlane(log *logrus.Logger, bpf interface{}, dnsCache map[string]*control.DnsCache, conf *daeConfig.Config, externGeoDataDirs []string) (c *control.ControlPlane, err error) {
func newControlPlane(log *logrus.Logger, bpf interface{}, dnsCache map[string]*control.DnsCache, conf *daeConfig.Config, externGeoDataDirs []string, chMsg chan<- *control.Msg) (c *control.ControlPlane, err error) {

// Print configuration.
if log.IsLevelEnabled(logrus.DebugLevel) {
Expand Down Expand Up @@ -229,17 +240,18 @@ func newControlPlane(log *logrus.Logger, bpf interface{}, dnsCache map[string]*c
}

// New dae control plane.
c, err = control.NewControlPlane(
log,
bpf,
dnsCache,
subscriptionToNodeList,
conf.Group,
&conf.Routing,
&conf.Global,
&conf.Dns,
externGeoDataDirs,
)
c, err = control.NewControlPlane(&control.Options{
Log: log,
Bpf: bpf,
DnsCache: dnsCache,
TagToNodeList: subscriptionToNodeList,
Groups: conf.Group,
RoutingA: &conf.Routing,
Global: &conf.Global,
DnsConfig: &conf.Dns,
ExternGeoDataDirs: externGeoDataDirs,
ChMsg: chMsg,
})
if err != nil {
return nil, err
}
Expand Down
11 changes: 8 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
module github.com/daeuniverse/dae-wing

go 1.19
go 1.21.0

toolchain go1.21.3

require (
github.com/daeuniverse/dae v0.2.0
github.com/glebarez/sqlite v1.8.0
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/graph-gophers/graphql-go v1.5.1-0.20230228210639-f05ace9f4a41
github.com/json-iterator/go v1.1.12
github.com/lesismal/nbio v1.3.20
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/mzz2017/softwind v0.0.0-20230803152605-5f1f6bc06934
Expand Down Expand Up @@ -35,7 +38,6 @@ require (
github.com/bits-and-blooms/bloom/v3 v3.5.0 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/daeuniverse/dae-config-dist/go/dae_config v0.0.0-20230604120805-1c27619b592d // indirect
github.com/daeuniverse/outbound v0.0.0-20240101085641-7932e7df927d // indirect
github.com/daeuniverse/softwind v0.0.0-20231230065827-eed67f20d2c1 // indirect
github.com/dgryski/go-camellia v0.0.0-20191119043421-69a8a13fb23d // indirect
github.com/dgryski/go-idea v0.0.0-20170306091226-d2fb45a411fb // indirect
Expand All @@ -54,6 +56,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lesismal/llib v1.1.12 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
Expand All @@ -75,7 +78,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/v2rayA/ahocorasick-domain v0.0.0-20231231085011-99ceb8ef3208 // indirect
github.com/v2rayA/ahocorasick-domain v0.0.0-20230218160829-122a074c48c8 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
gitlab.com/yawning/chacha20.git v0.0.0-20230427033715-7877545b1b37 // indirect
Expand All @@ -96,4 +99,6 @@ require (

replace github.com/daeuniverse/dae => ./dae-core

replace github.com/graph-gophers/graphql-transport-ws => github.com/mikhailv/graphql-transport-ws v0.0.0-20230405003623-3bf02386d7ce

// replace github.com/daeuniverse/dae => ../dae
Loading

0 comments on commit 95a77e5

Please sign in to comment.