Skip to content

Commit

Permalink
Proxy primary redirect timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Sep 29, 2023
1 parent dce966e commit 429d1c4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
14 changes: 8 additions & 6 deletions cmd/litefs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewConfig() Config {
config.Log.Format = "text"

config.Proxy.MaxLag = http.DefaultMaxLag
config.Proxy.PrimaryRedirectTimeout = http.DefaultPrimaryRedirectTimeout

config.Tracing.Enabled = true
config.Tracing.MaxSize = DefaultTracingMaxSize
Expand Down Expand Up @@ -113,12 +114,13 @@ type HTTPConfig struct {

// ProxyConfig represents the configuration for the HTTP proxy server.
type ProxyConfig struct {
Addr string `yaml:"addr"`
Target string `yaml:"target"`
DB string `yaml:"db"`
MaxLag time.Duration `yaml:"max-lag"`
Debug bool `yaml:"debug"`
Passthrough []string `yaml:"passthrough"`
Addr string `yaml:"addr"`
Target string `yaml:"target"`
DB string `yaml:"db"`
MaxLag time.Duration `yaml:"max-lag"`
Debug bool `yaml:"debug"`
Passthrough []string `yaml:"passthrough"`
PrimaryRedirectTimeout time.Duration `yaml:"primary-redirect-timeout"`
}

// LeaseConfig represents a generic configuration for all lease types.
Expand Down
1 change: 1 addition & 0 deletions cmd/litefs/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (c *MountCommand) runProxyServer(ctx context.Context) error {
server.MaxLag = c.Config.Proxy.MaxLag
server.Debug = c.Config.Proxy.Debug
server.Passthroughs = passthroughs
server.PrimaryRedirectTimeout = c.Config.Proxy.PrimaryRedirectTimeout
if err := server.Listen(); err != nil {
return err
}
Expand Down
17 changes: 12 additions & 5 deletions http/proxy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
DefaultPollTXIDInterval = 1 * time.Millisecond
DefaultPollTXIDTimeout = 5 * time.Second

DefaultPrimaryRedirectTimeout = 5 * time.Second

DefaultMaxLag = 10 * time.Second

DefaultCookieExpiry = 5 * time.Minute
Expand Down Expand Up @@ -63,6 +65,8 @@ type ProxyServer struct {
PollTXIDInterval time.Duration
PollTXIDTimeout time.Duration

PrimaryRedirectTimeout time.Duration

// Maximum allowable lag before the health endpoint returns an error code.
MaxLag time.Duration

Expand All @@ -77,10 +81,11 @@ func NewProxyServer(store *litefs.Store) *ProxyServer {
s := &ProxyServer{
store: store,

PollTXIDInterval: DefaultPollTXIDInterval,
PollTXIDTimeout: DefaultPollTXIDTimeout,
MaxLag: DefaultMaxLag,
CookieExpiry: DefaultCookieExpiry,
PollTXIDInterval: DefaultPollTXIDInterval,
PollTXIDTimeout: DefaultPollTXIDTimeout,
MaxLag: DefaultMaxLag,
CookieExpiry: DefaultCookieExpiry,
PrimaryRedirectTimeout: DefaultPrimaryRedirectTimeout,
}

s.ctx, s.cancel = context.WithCancelCause(context.Background())
Expand Down Expand Up @@ -253,7 +258,9 @@ LOOP:
}

func (s *ProxyServer) serveNonRead(w http.ResponseWriter, r *http.Request) {
isPrimary, info := s.store.PrimaryInfo()
ctx, cancel := context.WithTimeout(r.Context(), s.PrimaryRedirectTimeout)
defer cancel()
isPrimary, info := s.store.PrimaryInfoWithContext(ctx)

// If this is the primary, send the request to the target.
if isPrimary {
Expand Down
22 changes: 22 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,28 @@ func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo) {
return s.isPrimary(), s.primaryInfo.Clone()
}

// PrimaryInfoWithContext continually attempts to fetch the primary info until available.
// Returns when isPrimary is true, info is non-nil, or when ctx is done.
func (s *Store) PrimaryInfoWithContext(ctx context.Context) (isPrimary bool, info *PrimaryInfo) {
if isPrimary, info = s.PrimaryInfo(); isPrimary || info != nil {
return isPrimary, info
}

ticker := time.NewTicker(100 * time.Microsecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return isPrimary, info
case <-ticker.C:
if isPrimary, info = s.PrimaryInfo(); isPrimary || info != nil {
return isPrimary, info
}
}
}
}

func (s *Store) setPrimaryInfo(info *PrimaryInfo) {
s.primaryInfo = info
s.notifyPrimaryChange()
Expand Down

0 comments on commit 429d1c4

Please sign in to comment.