Skip to content

Commit

Permalink
rhp4: add rhp4 handler
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Oct 22, 2024
1 parent 7d61fc0 commit 2c06116
Show file tree
Hide file tree
Showing 53 changed files with 1,243 additions and 1,191 deletions.
27 changes: 2 additions & 25 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/settings/pin"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/jape"
"go.uber.org/zap"
Expand Down Expand Up @@ -86,7 +85,7 @@ type (
SetReadOnly(id int64, readOnly bool) error
RemoveSector(root types.Hash256) error
ResizeCache(size uint32)
Read(types.Hash256) (*[rhp2.SectorSize]byte, error)
ReadSector(types.Hash256) ([rhp2.SectorSize]byte, error)

// SectorReferences returns the references to a sector
SectorReferences(root types.Hash256) (storage.SectorReference, error)
Expand Down Expand Up @@ -151,14 +150,6 @@ type (
BroadcastToWebhook(id int64, event, scope string, data interface{}) error
}

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(rhp.SessionSubscriber)
Unsubscribe(rhp.SessionSubscriber)

Active() []rhp.Session
}

// An api provides an HTTP API for the host
api struct {
hostKey types.PublicKey
Expand All @@ -167,7 +158,6 @@ type (
log *zap.Logger
alerts Alerts
webhooks Webhooks
sessions RHPSessionReporter

sqlite3Store SQLite3Store

Expand Down Expand Up @@ -200,22 +190,12 @@ func (a *api) requiresExplorer(h jape.Handler) jape.Handler {
}
}

// NewServer initializes the API
// syncer
// chain
// accounts
// contracts
// volumes
// wallet
// metrics
// settings
// index
// NewServer initializes the API server with the given options
func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer, am AccountManager, c ContractManager, vm VolumeManager, wm Wallet, mm MetricManager, sm Settings, im Index, opts ...ServerOption) http.Handler {
a := &api{
hostKey: hostKey,
name: name,

sessions: noopSessionReporter{},
alerts: noopAlerts{},
webhooks: noopWebhooks{},
log: zap.NewNop(),
Expand Down Expand Up @@ -290,9 +270,6 @@ func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer,
"DELETE /volumes/:id": a.handleDeleteVolume,
"DELETE /volumes/:id/cancel": a.handleDELETEVolumeCancelOp,
"PUT /volumes/:id/resize": a.handlePUTVolumeResize,
// session endpoints
"GET /sessions": a.handleGETSessions,
"GET /sessions/subscribe": a.handleGETSessionsSubscribe,
// tpool endpoints
"GET /tpool/fee": a.handleGETTPoolFee,
// wallet endpoints
Expand Down
14 changes: 0 additions & 14 deletions api/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/hostd/alerts"
"go.sia.tech/hostd/explorer"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -51,13 +50,6 @@ func WithExplorer(explorer *explorer.Explorer) ServerOption {
}
}

// WithRHPSessionReporter sets the RHP session reporter for the API server.
func WithRHPSessionReporter(rsr RHPSessionReporter) ServerOption {
return func(a *api) {
a.sessions = rsr
}
}

// WithLogger sets the logger for the API server.
func WithLogger(log *zap.Logger) ServerOption {
return func(a *api) {
Expand All @@ -81,9 +73,3 @@ type noopAlerts struct{}

func (noopAlerts) Active() []alerts.Alert { return nil }
func (noopAlerts) Dismiss(...types.Hash256) {}

type noopSessionReporter struct{}

func (noopSessionReporter) Subscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Unsubscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Active() []rhp.Session { return nil }
21 changes: 0 additions & 21 deletions api/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,24 +462,3 @@ func (w WalletPendingResp) PrometheusMetric() (metrics []prometheus.Metric) {
}
return
}

// PrometheusMetric returns Prometheus samples for the hosts sessions
func (s SessionResp) PrometheusMetric() (metrics []prometheus.Metric) {
for _, session := range s {
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_ingress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Ingress),
})
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_egress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Egress),
})
}
return
}
45 changes: 0 additions & 45 deletions api/rhpsessions.go

This file was deleted.

4 changes: 0 additions & 4 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.sia.tech/hostd/host/metrics"
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
)

// JSON keys for host setting fields
Expand Down Expand Up @@ -187,9 +186,6 @@ type (

// WalletPendingResp is the response body for the [GET] /wallet/pending endpoint
WalletPendingResp []wallet.Event

// SessionResp is the response body for the [GET] /sessions endpoint
SessionResp []rhp.Session
)

// MarshalJSON implements json.Marshaler
Expand Down
4 changes: 2 additions & 2 deletions api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ func (a *api) handleGETVerifySector(jc jape.Context) {
}

// try to read the sector data and verify the root
data, err := a.volumes.Read(root)
sector, err := a.volumes.ReadSector(root)
if err != nil {
resp.Error = err.Error()
} else if calc := rhp2.SectorRoot(data); calc != root {
} else if calc := rhp2.SectorRoot(&sector); calc != root {
resp.Error = fmt.Sprintf("sector is corrupt: expected root %q, got %q", root, calc)
}
jc.Encode(resp)
Expand Down
5 changes: 5 additions & 0 deletions cmd/hostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ var (
RHP3: config.RHP3{
TCPAddress: ":9983",
},
RHP4: config.RHP4{
ListenAddresses: []config.RHP4ListenAddress{
{Protocol: "tcp", Address: ":9984"},
},
},
Log: config.Log{
Path: os.Getenv(logFileEnvVar), // deprecated. included for compatibility.
Level: "info",
Expand Down
86 changes: 54 additions & 32 deletions cmd/hostd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/hostd/alerts"
Expand Down Expand Up @@ -186,18 +187,6 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
}
defer syncerListener.Close()

rhp2Listener, err := net.Listen("tcp", cfg.RHP2.Address)
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp3Listener, err := net.Listen("tcp", cfg.RHP3.TCPAddress)
if err != nil {
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

syncerAddr := syncerListener.Addr().String()
if cfg.Syncer.EnableUPnP {
_, portStr, _ := net.SplitHostPort(cfg.Syncer.Address)
Expand Down Expand Up @@ -238,73 +227,106 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
go s.Run(ctx)
defer s.Close()

wm, err := wallet.NewSingleAddressWallet(walletKey, cm, store, wallet.WithLogger(log.Named("wallet")), wallet.WithReservationDuration(3*time.Hour))
if err != nil {
return fmt.Errorf("failed to create wallet: %w", err)
}
defer wm.Close()

wr, err := webhooks.NewManager(store, log.Named("webhooks"))
if err != nil {
return fmt.Errorf("failed to create webhook reporter: %w", err)
}
defer wr.Close()
sr := rhp.NewSessionReporter()

am := alerts.NewManager(alerts.WithEventReporter(wr), alerts.WithLog(log.Named("alerts")))

cfm, err := settings.NewConfigManager(hostKey, store, cm, s, wm, settings.WithAlertManager(am), settings.WithLog(log.Named("settings")))
wm, err := wallet.NewSingleAddressWallet(walletKey, cm, store, wallet.WithLogger(log.Named("wallet")), wallet.WithReservationDuration(3*time.Hour))
if err != nil {
return fmt.Errorf("failed to create settings manager: %w", err)
return fmt.Errorf("failed to create wallet: %w", err)
}
defer cfm.Close()
defer wm.Close()

vm, err := storage.NewVolumeManager(store, storage.WithLogger(log.Named("volumes")), storage.WithAlerter(am))
if err != nil {
return fmt.Errorf("failed to create storage manager: %w", err)
}
defer vm.Close()

contractManager, err := contracts.NewManager(store, vm, cm, s, wm, contracts.WithLog(log.Named("contracts")), contracts.WithAlerter(am))
sm, err := settings.NewConfigManager(hostKey, store, cm, s, wm, vm, settings.WithAlertManager(am), settings.WithLog(log.Named("settings")))
if err != nil {
return fmt.Errorf("failed to create settings manager: %w", err)
}
defer sm.Close()

contracts, err := contracts.NewManager(store, vm, cm, s, wm, contracts.WithLog(log.Named("contracts")), contracts.WithAlerter(am))
if err != nil {
return fmt.Errorf("failed to create contracts manager: %w", err)
}
defer contractManager.Close()
defer contracts.Close()

index, err := index.NewManager(store, cm, contractManager, wm, cfm, vm, index.WithLog(log.Named("index")), index.WithBatchSize(cfg.Consensus.IndexBatchSize))
index, err := index.NewManager(store, cm, contracts, wm, sm, vm, index.WithLog(log.Named("index")), index.WithBatchSize(cfg.Consensus.IndexBatchSize))
if err != nil {
return fmt.Errorf("failed to create index manager: %w", err)
}
defer index.Close()

dr := rhp.NewDataRecorder(store, log.Named("data"))
rl, wl := sm.RHPBandwidthLimiters()
rhp2Listener, err := rhp.Listen("tcp", cfg.RHP2.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp3Listener, err := rhp.Listen("tcp", cfg.RHP3.TCPAddress, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

rhp2, err := rhp2.NewSessionHandler(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, s, wm, contractManager, cfm, vm, rhp2.WithDataMonitor(dr), rhp2.WithLog(log.Named("rhp2")))
rhp2, err := rhp2.NewSessionHandler(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, s, wm, contracts, sm, vm, log.Named("rhp2"))
if err != nil {
return fmt.Errorf("failed to create rhp2 session handler: %w", err)
}
go rhp2.Serve()
defer rhp2.Close()

registry := registry.NewManager(hostKey, store, log.Named("registry"))
accounts := accounts.NewManager(store, cfm)
rhp3, err := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contractManager, registry, vm, cfm, rhp3.WithDataMonitor(dr), rhp3.WithSessionReporter(sr), rhp3.WithLog(log.Named("rhp3")))
accounts := accounts.NewManager(store, sm)
rhp3, err := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contracts, registry, vm, sm, log.Named("rhp3"))
if err != nil {
return fmt.Errorf("failed to create rhp3 session handler: %w", err)
}
go rhp3.Serve()
defer rhp3.Close()

rhp4 := rhp4.NewServer(hostKey, cm, s, contracts, wm, sm, vm, rhp4.WithPriceTableValidity(30*time.Minute), rhp4.WithContractProofWindowBuffer(72))

var stopListenerFuncs []func() error
defer func() {
for _, f := range stopListenerFuncs {
if err := f(); err != nil {
log.Error("failed to stop listener", zap.Error(err))
}
}
}()
for _, addr := range cfg.RHP4.ListenAddresses {
switch addr.Protocol {
case "tcp", "tcp4", "tcp6":
l, err := rhp.Listen(addr.Protocol, addr.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp4 addr: %w", err)
}
stopListenerFuncs = append(stopListenerFuncs, l.Close)
go serveRHP4SiaMux(l, rhp4, log.Named("rhp4"))
default:
return fmt.Errorf("unsupported protocol: %s", addr.Protocol)
}
}

apiOpts := []api.ServerOption{
api.WithAlerts(am),
api.WithLogger(log.Named("api")),
api.WithRHPSessionReporter(sr),
api.WithWebhooks(wr),
api.WithSQLite3Store(store),
}
if !cfg.Explorer.Disable {
ex := explorer.New(cfg.Explorer.URL)
pm, err := pin.NewManager(store, cfm, ex, pin.WithLogger(log.Named("pin")))
pm, err := pin.NewManager(store, sm, ex, pin.WithLogger(log.Named("pin")))
if err != nil {
return fmt.Errorf("failed to create pin manager: %w", err)
}
Expand All @@ -314,7 +336,7 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK

web := http.Server{
Handler: webRouter{
api: jape.BasicAuth(cfg.HTTP.Password)(api.NewServer(cfg.Name, hostKey.PublicKey(), cm, s, accounts, contractManager, vm, wm, store, cfm, index, apiOpts...)),
api: jape.BasicAuth(cfg.HTTP.Password)(api.NewServer(cfg.Name, hostKey.PublicKey(), cm, s, accounts, contracts, vm, wm, store, sm, index, apiOpts...)),
ui: hostd.Handler(),
},
ReadTimeout: 30 * time.Second,
Expand Down
Loading

0 comments on commit 2c06116

Please sign in to comment.