From 5d53c071efc7c92b013b12ccc5321e859f8f4810 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 5 Jun 2024 04:00:08 +0530 Subject: [PATCH 01/15] dynamic config from catalog: groundwork (#1771) 1. Renames `alerting_settings` table and adds columns to make it more useful for dynamic configuration. 2. Some code cleanup and genericization Default values should only be set from catalog, a migration will be added for existing settings and new settings to be added to this table. UI to be added in a future PR. --- flow/alerting/alerting.go | 20 ++++- flow/connectors/bigquery/bigquery.go | 5 +- flow/dynamicconf/dynamicconf.go | 87 ++++++++++--------- .../V27__dynconf_table_and_settings.sql | 10 +++ protos/flow.proto | 20 +++++ 5 files changed, 98 insertions(+), 44 deletions(-) create mode 100644 nexus/catalog/migrations/V27__dynconf_table_and_settings.sql diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 9405cbf4bc..5914c8b997 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -123,7 +123,11 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + defaultSlotLagMBAlertThreshold, err := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get slot lag alert threshold from catalog", slog.Any("error", err)) + return + } // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold for _, alertSender := range alertSenderConfigs { @@ -171,7 +175,11 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } // same as with slot lag, use lowest threshold for catalog - defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + defaultOpenConnectionsThreshold, err := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get open connections alert threshold from catalog", slog.Any("error", err)) + return + } lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold for _, alertSender := range alertSenderConfigs { if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 { @@ -216,7 +224,11 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSe // in the past X minutes, where X is configurable and defaults to 15 minutes // returns true if alert added to catalog, so proceed with processing alerts to slack func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool { - dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + dur, err := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get alerting gap duration from catalog", slog.Any("error", err)) + return false + } if dur == 0 { logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") return false @@ -227,7 +239,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i ORDER BY created_timestamp DESC LIMIT 1`, alertKey, alertConfigId) var createdTimestamp time.Time - err := row.Scan(&createdTimestamp) + err = row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) return false diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 9b10b1f32d..afa06a7cad 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -681,7 +681,10 @@ func (c *BigQueryConnector) SetupNormalizedTable( } } - timePartitionEnabled := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) + timePartitionEnabled, err := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) + if err != nil { + return false, fmt.Errorf("failed to get dynamic setting for BigQuery time partitioning: %w", err) + } var timePartitioning *bigquery.TimePartitioning if timePartitionEnabled && syncedAtColName != "" { timePartitioning = &bigquery.TimePartitioning{ diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index aedbce65ce..9fe99d1102 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -2,104 +2,113 @@ package dynamicconf import ( "context" + "fmt" "strconv" "time" "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/constraints" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" ) -func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { - var exists pgtype.Bool - query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" - err := conn.QueryRow(ctx, query, key).Scan(&exists) +//nolint:unused +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { + conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err) - return false + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return 0, fmt.Errorf("failed to get catalog connection pool: %w", err) } - return exists.Bool + var value pgtype.Text + query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name=$1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return 0, fmt.Errorf("failed to get key: %w", err) + } + + result, err := strconv.ParseInt(value.String, 10, 64) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) + return 0, fmt.Errorf("failed to parse as int64: %w", err) + } + + return T(result), nil } -func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { +func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string) (T, error) { conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue + return 0, fmt.Errorf("failed to get catalog connection pool: %w", err) } var value pgtype.Text - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name=$1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue + return 0, fmt.Errorf("failed to get key: %w", err) } - result, err := strconv.ParseUint(value.String, 10, 32) + result, err := strconv.ParseUint(value.String, 10, 64) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) - return defaultValue + logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) + return 0, fmt.Errorf("failed to parse as int64: %w", err) } - return uint32(result) + return T(result), nil } -func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { +func dynamicConfBool(ctx context.Context, key string) (bool, error) { conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue + return false, fmt.Errorf("failed to get catalog connection pool: %w", err) } var value pgtype.Text - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name = $1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue + return false, fmt.Errorf("failed to get key: %w", err) } result, err := strconv.ParseBool(value.String) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse bool: %v", err) - return defaultValue + return false, fmt.Errorf("failed to parse bool: %w", err) } - return result + return result, nil } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) +func PeerDBSlotLagMBAlertThreshold(ctx context.Context) (uint32, error) { + return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD") } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely -func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) - return time.Duration(why) * time.Minute +func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) (time.Duration, error) { + why, err := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES") + if err != nil { + return 0, err + } + return time.Duration(int64(why)) * time.Minute, nil } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely -func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) +func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) (uint32, error) { + return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD") } // PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS, for creating target tables with // partitioning by _PEERDB_SYNCED_AT column // If true, the target tables will be partitioned by _PEERDB_SYNCED_AT column // If false, the target tables will not be partitioned -func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS", false) +func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") } diff --git a/nexus/catalog/migrations/V27__dynconf_table_and_settings.sql b/nexus/catalog/migrations/V27__dynconf_table_and_settings.sql new file mode 100644 index 0000000000..adfcf167bd --- /dev/null +++ b/nexus/catalog/migrations/V27__dynconf_table_and_settings.sql @@ -0,0 +1,10 @@ +ALTER TABLE alerting_settings RENAME TO dynamic_settings; +ALTER TABLE dynamic_settings ADD COLUMN config_default_value TEXT, ADD COLUMN config_value_type INT, ADD COLUMN config_description TEXT, ADD COLUMN config_apply_mode INT; +ALTER TABLE dynamic_settings ALTER COLUMN config_value DROP NOT NULL; + +INSERT INTO dynamic_settings (config_name,config_value,config_default_value,config_value_type,config_description,config_apply_mode) +VALUES +('PEERDB_ALERTING_GAP_MINUTES',null,'15',3,'Duration in minutes before reraising alerts, 0 disables all alerting entirely',1), +('PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD',null,'5000',3,'Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely',1), +('PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD',null,'5',3,'Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely',1), +('PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS',null,'false',4,'BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column',4); diff --git a/protos/flow.proto b/protos/flow.proto index 4d4343f71a..c7f8e4fa25 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -404,3 +404,23 @@ message ExportTxSnapshotOutput { bool supports_tid_scans = 2; } +enum DynconfValueType { + UNKNOWN = 0; + STRING = 1; + INT = 2; + UINT = 3; + BOOL = 4; +} + +enum DynconfApplyMode { + APPLY_MODE_UNKNOWN = 0; + // should apply immediately + APPLY_MODE_IMMEDIATE = 1; + // should apply after the mirror is paused and resumed + APPLY_MODE_AFTER_RESUME = 2; + // should apply after pod is restarted + APPLY_MODE_RESTART = 3; + // only applies to newly created mirrors + APPLY_MODE_NEW_MIRROR = 4; +} + From 6ad9352361c6ef5fd82cda276a2e824d7218d2db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 5 Jun 2024 14:51:09 +0000 Subject: [PATCH 02/15] dynamicconf: lookup from env before using default. Also fix migration version number (#1787) Long term want to completely converge env/dynamicconf, but for now I have <10 min left in 8VC office & want to fix migration mixup. So existing env options don't have dynamicconf support now, but dynamicconf variables can be overridden using env if desirable. Just need to convert all env only variables to dynamicconf variables Long term want to add mirror level env hashmap too etc. So can have env defaults in a hashmap or something rather than storing every default in db --- flow/alerting/alerting.go | 7 +- flow/connectors/bigquery/bigquery.go | 3 +- .../{dynamicconf => peerdbenv}/dynamicconf.go | 71 ++++++++++--------- ...ql => V29__dynconf_table_and_settings.sql} | 0 4 files changed, 41 insertions(+), 40 deletions(-) rename flow/{dynamicconf => peerdbenv}/dynamicconf.go (60%) rename nexus/catalog/migrations/{V27__dynconf_table_and_settings.sql => V29__dynconf_table_and_settings.sql} (100%) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 5914c8b997..b5f21ffb57 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -123,7 +122,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - defaultSlotLagMBAlertThreshold, err := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + defaultSlotLagMBAlertThreshold, err := peerdbenv.PeerDBSlotLagMBAlertThreshold(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to get slot lag alert threshold from catalog", slog.Any("error", err)) return @@ -175,7 +174,7 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } // same as with slot lag, use lowest threshold for catalog - defaultOpenConnectionsThreshold, err := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + defaultOpenConnectionsThreshold, err := peerdbenv.PeerDBOpenConnectionsAlertThreshold(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to get open connections alert threshold from catalog", slog.Any("error", err)) return @@ -224,7 +223,7 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSe // in the past X minutes, where X is configurable and defaults to 15 minutes // returns true if alert added to catalog, so proceed with processing alerts to slack func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool { - dur, err := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + dur, err := peerdbenv.PeerDBAlertingGapMinutesAsDuration(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to get alerting gap duration from catalog", slog.Any("error", err)) return false diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index afa06a7cad..19f359ff0d 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -18,7 +18,6 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" numeric "github.com/PeerDB-io/peer-flow/datatypes" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" @@ -681,7 +680,7 @@ func (c *BigQueryConnector) SetupNormalizedTable( } } - timePartitionEnabled, err := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) + timePartitionEnabled, err := peerdbenv.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) if err != nil { return false, fmt.Errorf("failed to get dynamic setting for BigQuery time partitioning: %w", err) } diff --git a/flow/dynamicconf/dynamicconf.go b/flow/peerdbenv/dynamicconf.go similarity index 60% rename from flow/dynamicconf/dynamicconf.go rename to flow/peerdbenv/dynamicconf.go index 9fe99d1102..c939f8ccf0 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -1,35 +1,56 @@ -package dynamicconf +package peerdbenv import ( "context" "fmt" + "os" "strconv" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "golang.org/x/exp/constraints" "github.com/PeerDB-io/peer-flow/logger" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) -//nolint:unused -func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) +func dynLookup(ctx context.Context, key string) (string, error) { + conn, err := GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return 0, fmt.Errorf("failed to get catalog connection pool: %w", err) + return "", fmt.Errorf("failed to get catalog connection pool: %w", err) } var value pgtype.Text - query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name=$1" - err = conn.QueryRow(ctx, query, key).Scan(&value) + var default_value pgtype.Text + query := "SELECT config_value, config_default_value FROM dynamic_settings WHERE config_name=$1" + err = conn.QueryRow(ctx, query, key).Scan(&value, &default_value) if err != nil { + if err == pgx.ErrNoRows { + if val, ok := os.LookupEnv(key); ok { + return val, nil + } + } logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return 0, fmt.Errorf("failed to get key: %w", err) + return "", fmt.Errorf("failed to get key: %w", err) + } + if !value.Valid { + if val, ok := os.LookupEnv(key); ok { + return val, nil + } + return default_value.String, nil + } + return value.String, nil +} + +//nolint:unused +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { + value, err := dynLookup(ctx, key) + if err != nil { + return 0, err } - result, err := strconv.ParseInt(value.String, 10, 64) + result, err := strconv.ParseInt(value, 10, 64) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) return 0, fmt.Errorf("failed to parse as int64: %w", err) @@ -39,21 +60,12 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T } func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string) (T, error) { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) + value, err := dynLookup(ctx, key) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return 0, fmt.Errorf("failed to get catalog connection pool: %w", err) - } - - var value pgtype.Text - query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name=$1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return 0, fmt.Errorf("failed to get key: %w", err) + return 0, err } - result, err := strconv.ParseUint(value.String, 10, 64) + result, err := strconv.ParseUint(value, 10, 64) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) return 0, fmt.Errorf("failed to parse as int64: %w", err) @@ -63,21 +75,12 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string } func dynamicConfBool(ctx context.Context, key string) (bool, error) { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) + value, err := dynLookup(ctx, key) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return false, fmt.Errorf("failed to get catalog connection pool: %w", err) - } - - var value pgtype.Text - query := "SELECT coalesce(config_value,config_default_value) FROM dynamic_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return false, fmt.Errorf("failed to get key: %w", err) + return false, err } - result, err := strconv.ParseBool(value.String) + result, err := strconv.ParseBool(value) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse bool: %v", err) return false, fmt.Errorf("failed to parse bool: %w", err) diff --git a/nexus/catalog/migrations/V27__dynconf_table_and_settings.sql b/nexus/catalog/migrations/V29__dynconf_table_and_settings.sql similarity index 100% rename from nexus/catalog/migrations/V27__dynconf_table_and_settings.sql rename to nexus/catalog/migrations/V29__dynconf_table_and_settings.sql From d81ca53c31e82d45496f220f49a3a0a6bf660a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 5 Jun 2024 16:11:51 +0000 Subject: [PATCH 03/15] PEERDB_ALERTING_GAP_MINUTES is int64 (#1788) CodeQL didn't like uint32 conversion to int64 --- flow/peerdbenv/dynamicconf.go | 39 ++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index c939f8ccf0..b43c7a6964 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -43,50 +43,47 @@ func dynLookup(ctx context.Context, key string) (string, error) { return value.String, nil } -//nolint:unused -func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { +func dynLookupConvert[T any](ctx context.Context, key string, fn func(string) (T, error)) (T, error) { value, err := dynLookup(ctx, key) if err != nil { - return 0, err + var none T + return none, err } + return fn(value) +} - result, err := strconv.ParseInt(value, 10, 64) +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { + value, err := dynLookupConvert(ctx, key, func(value string) (int64, error) { + return strconv.ParseInt(value, 10, 64) + }) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) return 0, fmt.Errorf("failed to parse as int64: %w", err) } - return T(result), nil + return T(value), nil } func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string) (T, error) { - value, err := dynLookup(ctx, key) - if err != nil { - return 0, err - } - - result, err := strconv.ParseUint(value, 10, 64) + value, err := dynLookupConvert(ctx, key, func(value string) (uint64, error) { + return strconv.ParseUint(value, 10, 64) + }) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) return 0, fmt.Errorf("failed to parse as int64: %w", err) } - return T(result), nil + return T(value), nil } func dynamicConfBool(ctx context.Context, key string) (bool, error) { - value, err := dynLookup(ctx, key) - if err != nil { - return false, err - } - - result, err := strconv.ParseBool(value) + value, err := dynLookupConvert(ctx, key, strconv.ParseBool) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to parse bool: %v", err) return false, fmt.Errorf("failed to parse bool: %w", err) } - return result, nil + return value, nil } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely @@ -96,11 +93,11 @@ func PeerDBSlotLagMBAlertThreshold(ctx context.Context) (uint32, error) { // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) (time.Duration, error) { - why, err := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES") + why, err := dynamicConfSigned[int64](ctx, "PEERDB_ALERTING_GAP_MINUTES") if err != nil { return 0, err } - return time.Duration(int64(why)) * time.Minute, nil + return time.Duration(why) * time.Minute, nil } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely From 24596291be6366427aef061f2adefc793c2a6b7c Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 5 Jun 2024 23:52:14 +0530 Subject: [PATCH 04/15] UI: consolidated logs section (#1785) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a 'Logs' section in SIdebar where users can see logs across all mirrors. This is useful as now users don't need to click on logs of each mirror to search for erroring mirrors Can filter by mirror name and log level Screenshot 2024-06-04 at 10 42 42 PM --- ui/app/api/mirrors/errors/route.ts | 37 +++--- ui/app/api/mirrors/names/route.ts | 22 ++++ ui/app/dto/AlertDTO.ts | 9 +- ui/app/dto/MirrorsDTO.ts | 7 ++ ui/app/mirror-logs/layout.tsx | 11 ++ ui/app/mirror-logs/page.tsx | 20 ++++ ui/app/mirror-logs/table.tsx | 122 ++++++++++++++++++++ ui/app/mirrors/errors/[mirrorName]/page.tsx | 89 ++------------ ui/components/LogsTable.tsx | 115 ++++++++++++++++++ ui/components/SidebarComponent.tsx | 7 ++ ui/lib/Table/TableCell.styles.ts | 6 +- 11 files changed, 345 insertions(+), 100 deletions(-) create mode 100644 ui/app/api/mirrors/names/route.ts create mode 100644 ui/app/mirror-logs/layout.tsx create mode 100644 ui/app/mirror-logs/page.tsx create mode 100644 ui/app/mirror-logs/table.tsx create mode 100644 ui/components/LogsTable.tsx diff --git a/ui/app/api/mirrors/errors/route.ts b/ui/app/api/mirrors/errors/route.ts index f56dc98b7d..b0113395d2 100644 --- a/ui/app/api/mirrors/errors/route.ts +++ b/ui/app/api/mirrors/errors/route.ts @@ -10,19 +10,27 @@ export async function POST(request: Request) { const alertReq: MirrorLogsRequest = body; const skip = (alertReq.page - 1) * alertReq.numPerPage; - const mirrorErrors: MirrorLog[] = await prisma.flow_errors.findMany({ - where: { - OR: [ - { - flow_name: { - contains: alertReq.flowJobName, + const whereClause: any = alertReq.flowJobName + ? { + OR: [ + { + flow_name: { + contains: alertReq.flowJobName, + }, }, - }, - { - flow_name: alertReq.flowJobName, - }, - ], - }, + { + flow_name: alertReq.flowJobName, + }, + ], + } + : {}; + + if (alertReq.natureOfLog && alertReq.natureOfLog !== 'ALL') { + whereClause['error_type'] = alertReq.natureOfLog.toLowerCase(); + } + + const mirrorErrors: MirrorLog[] = await prisma.flow_errors.findMany({ + where: whereClause, orderBy: { error_timestamp: 'desc', }, @@ -32,16 +40,13 @@ export async function POST(request: Request) { error_message: true, error_type: true, error_timestamp: true, - ack: true, }, take: alertReq.numPerPage, skip, }); const total = await prisma.flow_errors.count({ - where: { - flow_name: alertReq.flowJobName, - }, + where: whereClause, }); const alertRes: MirrorLogsResponse = { diff --git a/ui/app/api/mirrors/names/route.ts b/ui/app/api/mirrors/names/route.ts new file mode 100644 index 0000000000..0a9a0726a0 --- /dev/null +++ b/ui/app/api/mirrors/names/route.ts @@ -0,0 +1,22 @@ +import prisma from '@/app/utils/prisma'; +export const dynamic = 'force-dynamic'; + +export async function GET(request: Request) { + const mirrorNames = await prisma.flow_errors.findMany({ + select: { + flow_name: true, + }, + // where flow_name is not like 'clone_%' + where: { + NOT: { + flow_name: { + startsWith: 'clone_', + }, + }, + }, + distinct: ['flow_name'], + }); + return new Response( + JSON.stringify(mirrorNames.map((mirror) => mirror.flow_name)) + ); +} diff --git a/ui/app/dto/AlertDTO.ts b/ui/app/dto/AlertDTO.ts index ee7277f962..85b58fa694 100644 --- a/ui/app/dto/AlertDTO.ts +++ b/ui/app/dto/AlertDTO.ts @@ -6,8 +6,16 @@ export type UAlertConfigResponse = { service_config: Prisma.JsonValue; }; +export enum LogType { + ERROR = 'ERROR', + WARNING = 'WARNING', + INFO = 'INFO', + ALL = 'ALL', +} + export type MirrorLogsRequest = { flowJobName: string; + natureOfLog?: LogType; page: number; numPerPage: number; }; @@ -17,7 +25,6 @@ export type MirrorLog = { error_message: string; error_type: string; error_timestamp: Date; - ack: boolean; }; export type MirrorLogsResponse = { diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index 7bf9b0fac2..23133bc589 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -49,3 +49,10 @@ export type MirrorRowsData = { deleteCount: number; totalCount: number; }; + +export type MirrorLogsType = { + flow_name: string; + error_message: string; + error_type: string; + error_timestamp: Date; +}[]; diff --git a/ui/app/mirror-logs/layout.tsx b/ui/app/mirror-logs/layout.tsx new file mode 100644 index 0000000000..3968fee83d --- /dev/null +++ b/ui/app/mirror-logs/layout.tsx @@ -0,0 +1,11 @@ +import SidebarComponent from '@/components/SidebarComponent'; +import { Layout } from '@/lib/Layout'; +import { PropsWithChildren, Suspense } from 'react'; + +export default function PageLayout({ children }: PropsWithChildren) { + return ( + }> + {children} + + ); +} diff --git a/ui/app/mirror-logs/page.tsx b/ui/app/mirror-logs/page.tsx new file mode 100644 index 0000000000..e18c2847d0 --- /dev/null +++ b/ui/app/mirror-logs/page.tsx @@ -0,0 +1,20 @@ +import { Header } from '@/lib/Header'; +import LogsView from './table'; + +const MirrorLogs = async () => { + return ( +
+
Logs
+ +
+ ); +}; + +export default MirrorLogs; diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx new file mode 100644 index 0000000000..7e6c4d6367 --- /dev/null +++ b/ui/app/mirror-logs/table.tsx @@ -0,0 +1,122 @@ +'use client'; + +import { + LogType, + MirrorLog, + MirrorLogsRequest, + MirrorLogsResponse, +} from '@/app/dto/AlertDTO'; +import LogsTable from '@/components/LogsTable'; +import { ProgressCircle } from '@/lib/ProgressCircle'; +import { useEffect, useState } from 'react'; +import ReactSelect from 'react-select'; +import 'react-toastify/dist/ReactToastify.css'; +import useSWR from 'swr'; +import { useLocalStorage } from 'usehooks-ts'; +import { fetcher } from '../utils/swr'; + +export default function LogsView() { + const [logs, setLogs] = useState([]); + const [mirrorName, setMirrorName] = useLocalStorage( + 'peerdbMirrorNameFilterForLogs', + '' + ); + const [natureOfLog, setNatureOfLog] = useLocalStorage( + 'peerdbLogTypeFilterForLogs', + LogType.ALL + ); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const { data: mirrors }: { data: string[]; error: any } = useSWR( + '/api/mirrors/names', + fetcher + ); + + useEffect(() => { + setCurrentPage(1); + }, [mirrorName]); + + useEffect(() => { + const req: MirrorLogsRequest = { + natureOfLog: natureOfLog, + flowJobName: mirrorName, + page: currentPage, + numPerPage: 15, + }; + + const fetchData = async () => { + try { + const response = await fetch('/api/mirrors/errors', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: MirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [currentPage, mirrorName, natureOfLog]); + + if (!mirrors) { + return ; + } + return ( +
+
+
+ ({ + value: mirror, + label: mirror, + }))} + onChange={(selectedOption) => + setMirrorName(selectedOption?.value ?? '') + } + placeholder='Filter by mirror' + /> +
+
+ ({ + value: type, + label: type, + }))} + onChange={(selectedOption) => + setNatureOfLog(selectedOption?.value ?? LogType.ALL) + } + placeholder='Filter by log type' + /> +
+
+ +
+ ); +} diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index e2fad6b8b5..441f04fe0d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -5,27 +5,13 @@ import { MirrorLogsRequest, MirrorLogsResponse, } from '@/app/dto/AlertDTO'; -import TimeLabel from '@/components/TimeComponent'; -import { Button } from '@/lib/Button'; -import { Icon } from '@/lib/Icon'; +import LogsTable from '@/components/LogsTable'; import { Label } from '@/lib/Label'; -import { Table, TableCell, TableRow } from '@/lib/Table'; import { useParams } from 'next/navigation'; import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; -const colorForErrorType = (errorType: string) => { - const errorUpper = errorType.toUpperCase(); - if (errorUpper === 'ERROR') { - return '#F45156'; - } else if (errorUpper === 'WARNING') { - return '#FFC107'; - } else { - return '#4CAF50'; - } -}; - export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); const [mirrorErrors, setMirrorErrors] = useState([]); @@ -65,18 +51,6 @@ export default function MirrorError() { fetchData(); }, [currentPage, params.mirrorName]); - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); - } - }; - - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); - } - }; - return ( <>
@@ -98,61 +72,12 @@ export default function MirrorError() {
- - Type - - - - Message - - - } - toolbar={{ - left: ( -
- - - - -
- ), - }} - > - {mirrorErrors.map((mirrorError, idx) => ( - - - {mirrorError.error_type.toUpperCase()} - - - - - - {mirrorError.error_message} - - - ))} -
+ diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx new file mode 100644 index 0000000000..3b65f89f28 --- /dev/null +++ b/ui/components/LogsTable.tsx @@ -0,0 +1,115 @@ +import { MirrorLog } from '@/app/dto/AlertDTO'; +import TimeLabel from '@/components/TimeComponent'; +import { Button } from '@/lib/Button'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import 'react-toastify/dist/ReactToastify.css'; + +const colorForErrorType = (errorType: string) => { + const errorUpper = errorType.toUpperCase(); + if (errorUpper === 'ERROR') { + return '#F45156'; + } else if (errorUpper === 'WARNING') { + return '#FFC107'; + } else { + return '#4CAF50'; + } +}; + +const extractFromCloneName = (mirrorOrCloneName: string) => { + if (mirrorOrCloneName.includes('clone_')) { + return mirrorOrCloneName.split('_')[1] + ' (initial load)'; + } + return mirrorOrCloneName; +}; + +const LogsTable = ({ + logs, + currentPage, + totalPages, + setCurrentPage, +}: { + logs: MirrorLog[]; + currentPage: number; + totalPages: number; + setCurrentPage: (page: number) => void; +}) => { + const handleNextPage = () => { + if (currentPage < totalPages) { + setCurrentPage(currentPage + 1); + } + }; + const handlePrevPage = () => { + if (currentPage > 1) { + setCurrentPage(currentPage - 1); + } + }; + + return ( + + Type + + + + Mirror + Message + + + } + toolbar={{ + left: ( +
+ + + + +
+ ), + }} + > + {logs.map((log, idx) => ( + + + {log.error_type.toUpperCase()} + + + + + + + + + {log.error_message} + + + ))} +
+ ); +}; + +export default LogsTable; diff --git a/ui/components/SidebarComponent.tsx b/ui/components/SidebarComponent.tsx index 3985c7dcaa..9f91cad9f8 100644 --- a/ui/components/SidebarComponent.tsx +++ b/ui/components/SidebarComponent.tsx @@ -150,6 +150,13 @@ export default function SidebarComponent() { > {sidebarState === 'open' && 'Scripts'} + } + > + {sidebarState === 'open' && 'Logs'} + ); diff --git a/ui/lib/Table/TableCell.styles.ts b/ui/lib/Table/TableCell.styles.ts index 5a4eba2d37..c9c1e257f1 100644 --- a/ui/lib/Table/TableCell.styles.ts +++ b/ui/lib/Table/TableCell.styles.ts @@ -11,6 +11,11 @@ const variants = { min-width: ${({ theme }) => theme.size.xxSmall}; max-width: ${({ theme }) => theme.size.xSmall}; `, + mirror_name: css` + overflow: auto; + min-width: ${({ theme }) => theme.size.small}; + max-width: ${({ theme }) => theme.size.medium}; + `, }; export type TableCellVariant = keyof typeof variants; @@ -18,7 +23,6 @@ type BaseTableCellProps = { $variant: TableCellVariant; }; export const BaseTableCell = styled.td` - border-collapse: collapse; overflow: hidden; ${({ $variant }) => variants[$variant]} `; From 32a395ef84cf821a6e97f0d1d868c74fb721480f Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 5 Jun 2024 11:40:57 -0700 Subject: [PATCH 05/15] fead: Add Settings Page for Dynamic Settings (#1789) Introduced a new settings page for managing dynamic settings with backend API routes and frontend components. - Error handling in API routes. - State management for editing in UI. - Updated database indexing for efficient querying. https://www.loom.com/share/ce7541a92538431f8c55b3ffcf712eea?sid=04cf9b72-6631-49be-9073-6ef8161432d4 --- ui/app/api/settings/route.ts | 47 ++++ .../mirrors/status/qrep/[mirrorId]/page.tsx | 2 +- ui/app/settings/layout.tsx | 7 + ui/app/settings/page.tsx | 235 ++++++++++++++++++ ui/components/SidebarComponent.tsx | 7 + ui/prisma/schema.prisma | 35 +-- 6 files changed, 318 insertions(+), 15 deletions(-) create mode 100644 ui/app/api/settings/route.ts create mode 100644 ui/app/settings/layout.tsx create mode 100644 ui/app/settings/page.tsx diff --git a/ui/app/api/settings/route.ts b/ui/app/api/settings/route.ts new file mode 100644 index 0000000000..318d5e0e3b --- /dev/null +++ b/ui/app/api/settings/route.ts @@ -0,0 +1,47 @@ +import prisma from '@/app/utils/prisma'; +import { dynamic_settings } from '@prisma/client'; + +export async function GET() { + try { + const configs: dynamic_settings[] = + await prisma.dynamic_settings.findMany(); + const serializedConfigs = configs.map((config) => ({ + ...config, + id: config.id, + })); + return new Response(JSON.stringify(serializedConfigs)); + } catch (error) { + console.error('Error fetching dynamic settings:', error); + return new Response( + JSON.stringify({ error: 'Failed to fetch dynamic settings' }), + { status: 500 } + ); + } +} + +export async function POST(request: Request) { + try { + const configReq: dynamic_settings = await request.json(); + const updateRes = await prisma.dynamic_settings.update({ + where: { + id: configReq.id, + }, + data: { + config_value: configReq.config_value, + }, + }); + + let updateStatus: 'success' | 'error' = 'error'; + if (updateRes.id) { + updateStatus = 'success'; + } + + return new Response(updateStatus); + } catch (error) { + console.error('Error updating dynamic setting:', error); + return new Response( + JSON.stringify({ error: 'Failed to update dynamic setting' }), + { status: 500 } + ); + } +} diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx index 02e0a758b6..9620cac16e 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx @@ -33,7 +33,7 @@ export default async function QRepMirrorStatus({ startTime: run.start_time, endTime: run.end_time, pulledRows: run.rows_in_partition, - syncedRows: run.rows_synced, + syncedRows: Number(run.rows_synced), }; return ret; }); diff --git a/ui/app/settings/layout.tsx b/ui/app/settings/layout.tsx new file mode 100644 index 0000000000..69a53b44ea --- /dev/null +++ b/ui/app/settings/layout.tsx @@ -0,0 +1,7 @@ +import SidebarComponent from '@/components/SidebarComponent'; +import { Layout } from '@/lib/Layout'; +import { PropsWithChildren } from 'react'; + +export default function PageLayout({ children }: PropsWithChildren) { + return }>{children}; +} diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx new file mode 100644 index 0000000000..18af3ff63f --- /dev/null +++ b/ui/app/settings/page.tsx @@ -0,0 +1,235 @@ +'use client'; + +import { DynconfApplyMode } from '@/grpc_generated/flow'; +import { Button } from '@/lib/Button'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { SearchField } from '@/lib/SearchField'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import { dynamic_settings } from '@prisma/client'; +import { MaterialSymbol } from 'material-symbols'; +import { useEffect, useMemo, useState } from 'react'; + +const ROWS_PER_PAGE = 7; + +const ApplyModeIconWithTooltip = ({ applyMode }: { applyMode: number }) => { + let tooltipText = ''; + let iconName: MaterialSymbol = 'help'; + + switch (applyMode) { + case DynconfApplyMode.APPLY_MODE_IMMEDIATE: + tooltipText = 'Changes to this configuration will apply immediately'; + iconName = 'bolt'; + break; + case DynconfApplyMode.APPLY_MODE_AFTER_RESUME: + tooltipText = 'Changes to this configuration will apply after resume'; + iconName = 'cached'; + break; + case DynconfApplyMode.APPLY_MODE_RESTART: + tooltipText = + 'Changes to this configuration will apply after server restart.'; + iconName = 'restart_alt'; + break; + case DynconfApplyMode.APPLY_MODE_NEW_MIRROR: + tooltipText = + 'Changes to this configuration will apply only to new mirrors'; + iconName = 'new_window'; + break; + default: + tooltipText = 'Unknown apply mode'; + iconName = 'help'; + } + + return ( +
+ + + +
+ ); +}; + +const DynamicSettingItem = ({ + setting, + onSettingUpdate, +}: { + setting: dynamic_settings; + onSettingUpdate: () => void; +}) => { + const [editMode, setEditMode] = useState(false); + const [newValue, setNewValue] = useState(setting.config_value); + + const handleEdit = () => { + setEditMode(true); + }; + + const handleSave = async () => { + const updatedSetting = { ...setting, config_value: newValue }; + await fetch('/api/settings', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(updatedSetting), + }); + setEditMode(false); + onSettingUpdate(); + }; + + return ( + + + + + + {editMode ? ( +
+ setNewValue(e.target.value)} + variant='simple' + /> + +
+ ) : ( +
+ {setting.config_value || 'N/A'} + +
+ )} +
+ + {setting.config_default_value || 'N/A'} + + + {setting.config_description || 'N/A'} + + + + +
+ ); +}; + +const SettingsPage = () => { + const [settings, setSettings] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [searchQuery, setSearchQuery] = useState(''); + const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('asc'); + const sortField = 'config_name'; + + const fetchSettings = async () => { + const response = await fetch('/api/settings'); + const data = await response.json(); + setSettings(data); + }; + + useEffect(() => { + fetchSettings(); + }, []); + + const totalPages = Math.ceil(settings.length / ROWS_PER_PAGE); + + const displayedSettings = useMemo(() => { + const filteredSettings = settings.filter((setting) => + setting.config_name.toLowerCase().includes(searchQuery.toLowerCase()) + ); + filteredSettings.sort((a, b) => { + const aValue = a[sortField]; + const bValue = b[sortField]; + if (aValue === null || bValue === null) return 0; + if (aValue < bValue) return sortDir === 'dsc' ? 1 : -1; + if (aValue > bValue) return sortDir === 'dsc' ? -1 : 1; + return 0; + }); + + const startRow = (currentPage - 1) * ROWS_PER_PAGE; + const endRow = startRow + ROWS_PER_PAGE; + return filteredSettings.slice(startRow, endRow); + }, [settings, currentPage, searchQuery, sortField, sortDir]); + + const handlePrevPage = () => { + if (currentPage > 1) setCurrentPage(currentPage - 1); + }; + + const handleNextPage = () => { + if (currentPage < totalPages) setCurrentPage(currentPage + 1); + }; + + return ( +
+ Settings List} + toolbar={{ + left: ( +
+ + + + + + +
+ ), + right: ( + setSearchQuery(e.target.value)} + /> + ), + }} + header={ + + {[ + { header: 'Configuration Name', width: '35%' }, + { header: 'Current Value', width: '10%' }, + { header: 'Default Value', width: '10%' }, + { header: 'Description', width: '35%' }, + { header: 'Apply Mode', width: '10%' }, + ].map(({ header, width }) => ( + + {header} + + ))} + + } + > + {displayedSettings.map((setting) => ( + + ))} +
+
+ ); +}; + +export default SettingsPage; diff --git a/ui/components/SidebarComponent.tsx b/ui/components/SidebarComponent.tsx index 9f91cad9f8..5be2e128f6 100644 --- a/ui/components/SidebarComponent.tsx +++ b/ui/components/SidebarComponent.tsx @@ -157,6 +157,13 @@ export default function SidebarComponent() { > {sidebarState === 'open' && 'Logs'} + } + > + {sidebarState === 'open' && 'Settings'} + ); diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma index 4e3309aa80..f792406ac8 100644 --- a/ui/prisma/schema.prisma +++ b/ui/prisma/schema.prisma @@ -122,7 +122,7 @@ model qrep_partitions { restart_count Int metadata Json? id Int @id @default(autoincrement()) - rows_synced Int? + rows_synced BigInt? qrep_runs qrep_runs @relation(fields: [flow_name, run_uuid], references: [flow_name, run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run") @@unique([run_uuid, partition_uuid]) @@ -213,20 +213,14 @@ model schema_deltas_audit_log { @@schema("peerdb_stats") } -model alerting_settings { - id Int @id(map: "alerting_settings_pkey1") @default(autoincrement()) - config_name String - config_value String - - @@schema("public") -} - model dynamic_settings { - id Int @id(map: "alerting_settings_pkey") @default(autoincrement()) - config_name String @unique(map: "idx_alerting_settings_config_name") - config_value String - setting_description String? - needs_restart Boolean? + id Int @id(map: "alerting_settings_pkey") @default(autoincrement()) + config_name String @unique(map: "idx_alerting_settings_config_name") + config_value String? + config_default_value String? + config_value_type Int? + config_description String? + config_apply_mode Int? @@schema("public") } @@ -261,6 +255,19 @@ model scripts { @@schema("public") } +model ch_s3_stage { + id Int @id @default(autoincrement()) + flow_job_name String + sync_batch_id BigInt + avro_file Json + created_at DateTime @default(now()) @db.Timestamptz(6) + + @@unique([flow_job_name, sync_batch_id]) + @@index([flow_job_name]) + @@index([sync_batch_id]) + @@schema("public") +} + enum script_lang { lua From 26a47ea0418431b73dad3486f58bb21cfd459c3f Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 6 Jun 2024 01:22:28 +0530 Subject: [PATCH 06/15] ReplicateQRepPartitions: Tweak activity params (#1791) replicateQRepPartition activity settings: - heartbeat timeout: increase to 5 minutes. We do heartbeat in the activity but 1 minute is too close - maximum retry interval to 10 minutes instead of 1 hour. An error in qrep can soon lead to having to wait for an hour to see a fix resolve the mirror - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --- flow/workflows/qrep_flow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 75d9946526..71628d2917 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -201,11 +201,11 @@ func (q *QRepPartitionFlowExecution) replicatePartitions(ctx workflow.Context, ) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, - HeartbeatTimeout: time.Minute, + HeartbeatTimeout: 5 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Minute, BackoffCoefficient: 2., - MaximumInterval: time.Hour, + MaximumInterval: 10 * time.Minute, MaximumAttempts: 0, NonRetryableErrorTypes: nil, }, From 34f0f745de0062eed83584c1d7d25ac4aa171d2b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 5 Jun 2024 19:00:51 -0700 Subject: [PATCH 07/15] feat(cdc): enhance schema box with schema settings and target schema (#1793) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The rationale here is that editing the schema individually is not an ideal experience. Screenshot 2024-06-05 at 4 46 24 PM - Added `SchemaSettings` component for modifying target schema. - Introduced `defaultTargetSchema` state to manage target schema. - Implemented `fetchTablesForSchema` function to handle table fetching with the target schema. - Modified `fetchTables` function to accept `targetSchemaName` parameter. - Updated table row handling to use `value` instead of `defaultValue` for destination inputs. - Improved schema box initialization with `useEffect` to fetch tables based on `defaultTargetSchema`. --- ui/app/mirrors/create/cdc/schemabox.tsx | 56 +++++++++---- ui/app/mirrors/create/cdc/schemasettings.tsx | 85 ++++++++++++++++++++ ui/app/mirrors/create/handlers.ts | 16 +++- 3 files changed, 141 insertions(+), 16 deletions(-) create mode 100644 ui/app/mirrors/create/cdc/schemasettings.tsx diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 4ec403f8ee..35ae2bf95d 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -1,4 +1,5 @@ 'use client'; + import { TableMapRow } from '@/app/dto/MirrorsDTO'; import { DBType } from '@/grpc_generated/peers'; import { Checkbox } from '@/lib/Checkbox'; @@ -12,12 +13,14 @@ import { Dispatch, SetStateAction, useCallback, + useEffect, useMemo, useState, } from 'react'; import { BarLoader } from 'react-spinners/'; import { fetchColumns, fetchTables } from '../handlers'; import ColumnBox from './columnbox'; +import { SchemaSettings } from './schemasettings'; import { expandableStyle, schemaBoxStyle, @@ -37,6 +40,7 @@ interface SchemaBoxProps { peerType?: DBType; omitAdditionalTables: string[] | undefined; } + const SchemaBox = ({ sourcePeer, peerType, @@ -51,6 +55,8 @@ const SchemaBox = ({ const [columnsLoading, setColumnsLoading] = useState(false); const [expandedSchemas, setExpandedSchemas] = useState([]); const [tableQuery, setTableQuery] = useState(''); + const [defaultTargetSchema, setDefaultTargetSchema] = + useState(schema); const searchedTables = useMemo(() => { const tableQueryLower = tableQuery.toLowerCase(); @@ -146,19 +152,7 @@ const SchemaBox = ({ setExpandedSchemas((curr) => [...curr, schemaName]); if (rowsDoNotHaveSchemaTables(schemaName)) { - setTablesLoading(true); - fetchTables(sourcePeer, schemaName, peerType).then((newRows) => { - for (const row of newRows) { - if (omitAdditionalTables?.includes(row.source)) { - row.canMirror = false; - } - } - setRows((oldRows) => [ - ...oldRows.filter((oldRow) => oldRow.schema !== schema), - ...newRows, - ]); - setTablesLoading(false); - }); + fetchTablesForSchema(schemaName); } } else { setExpandedSchemas((curr) => @@ -167,6 +161,34 @@ const SchemaBox = ({ } }; + const fetchTablesForSchema = useCallback( + (schemaName: string) => { + setTablesLoading(true); + fetchTables(sourcePeer, schemaName, defaultTargetSchema, peerType).then( + (newRows) => { + for (const row of newRows) { + if (omitAdditionalTables?.includes(row.source)) { + row.canMirror = false; + } + } + setRows((oldRows) => { + const filteredRows = oldRows.filter( + (oldRow) => oldRow.schema !== schemaName + ); + const updatedRows = [...filteredRows, ...newRows]; + return updatedRows; + }); + setTablesLoading(false); + } + ); + }, + [setRows, sourcePeer, defaultTargetSchema, peerType, omitAdditionalTables] + ); + + useEffect(() => { + fetchTablesForSchema(schema); + }, [schema, fetchTablesForSchema]); + return (
@@ -200,6 +222,12 @@ const SchemaBox = ({ setTableQuery(e.target.value) } /> +
+ +
{/* TABLE BOX */} @@ -273,7 +301,7 @@ const SchemaBox = ({ }} variant='simple' placeholder={'Enter target table'} - defaultValue={row.destination} + value={row.destination} onChange={(e: React.ChangeEvent) => updateDestination(row.source, e.target.value) } diff --git a/ui/app/mirrors/create/cdc/schemasettings.tsx b/ui/app/mirrors/create/cdc/schemasettings.tsx new file mode 100644 index 0000000000..3a2b1e580f --- /dev/null +++ b/ui/app/mirrors/create/cdc/schemasettings.tsx @@ -0,0 +1,85 @@ +import { Icon } from '@/lib/Icon'; +import * as Popover from '@radix-ui/react-popover'; +import { useState } from 'react'; + +export const SchemaSettings = ({ + schema, + setTargetSchemaOverride, +}: { + schema: string; + setTargetSchemaOverride: (schema: string) => void; +}) => { + const [inputValue, setInputValue] = useState(schema); + const [savedIndicator, setSavedIndicator] = useState(false); + + const handleInputChange = (e: React.ChangeEvent) => { + setInputValue(e.target.value); + }; + + const handleSave = () => { + setTargetSchemaOverride(inputValue); + setSavedIndicator(true); + setTimeout(() => setSavedIndicator(false), 3000); + }; + + return ( + + +
+ +
+
+ + + +
+

Schema On Target

+ + + {savedIndicator && ( + + success + + )} +
+
+
+
+ ); +}; diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 5df96cfb76..72ee1d5486 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -309,12 +309,19 @@ const getDefaultDestinationTable = ( peerType.toString() == 'BIGQUERY' || dBTypeToJSON(peerType) == 'BIGQUERY' ) { - return tableName; + if (schemaName.length === 0) { + return tableName; + } + return `${schemaName}_${tableName}`; } + if ( peerType.toString() == 'CLICKHOUSE' || dBTypeToJSON(peerType) == 'CLICKHOUSE' ) { + if (schemaName.length === 0) { + return tableName; + } return `${schemaName}_${tableName}`; } @@ -325,12 +332,17 @@ const getDefaultDestinationTable = ( return `.${schemaName}_${tableName}.`; } + if (schemaName.length === 0) { + return tableName; + } + return `${schemaName}.${tableName}`; }; export const fetchTables = async ( peerName: string, schemaName: string, + targetSchemaName: string, peerType?: DBType ) => { if (schemaName.length === 0) return []; @@ -351,7 +363,7 @@ export const fetchTables = async ( // for bigquery, tables are not schema-qualified const dstName = getDefaultDestinationTable( peerType!, - schemaName, + targetSchemaName, tableObject.tableName ); tables.push({ From dbf5a948dc9cb9bff8077b9340ba2640693900f7 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 6 Jun 2024 19:01:29 +0530 Subject: [PATCH 08/15] BigQuery rename tables: account for json column in replica identity full table (#1795) When handling soft deletes, we perform equality checks for primary key columns in _resync and original tables. For a replica identity full table, we mark all columns as primary keys. If one of the columns in that case is a JSON, `=` comparison fails. Need to use TO_JSON_STRING instead like how we do for merge. Functionally tested --- flow/connectors/bigquery/bigquery.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 19f359ff0d..bcfffaae79 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -749,9 +749,15 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename continue } + // For a table with replica identity full and a JSON column + // the equals to comparison we do down below will fail + // so we need to use TO_JSON_STRING for those columns + columnIsJSON := make(map[string]bool, len(renameRequest.TableSchema.Columns)) columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns)) for _, col := range renameRequest.TableSchema.Columns { - columnNames = append(columnNames, "`"+col.Name+"`") + quotedCol := "`" + col.Name + "`" + columnNames = append(columnNames, quotedCol) + columnIsJSON[quotedCol] = (col.Type == "json" || col.Type == "jsonb") } if req.SoftDeleteColName != nil { @@ -777,10 +783,19 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename pkeyOnClauseBuilder := strings.Builder{} ljWhereClauseBuilder := strings.Builder{} for idx, col := range pkeyCols { - pkeyOnClauseBuilder.WriteString("_pt.") - pkeyOnClauseBuilder.WriteString(col) - pkeyOnClauseBuilder.WriteString(" = _resync.") - pkeyOnClauseBuilder.WriteString(col) + if columnIsJSON[col] { + // We need to use TO_JSON_STRING for comparing JSON columns + pkeyOnClauseBuilder.WriteString("TO_JSON_STRING(_pt.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString(")=TO_JSON_STRING(_resync.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString(")") + } else { + pkeyOnClauseBuilder.WriteString("_pt.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString("=_resync.") + pkeyOnClauseBuilder.WriteString(col) + } ljWhereClauseBuilder.WriteString("_resync.") ljWhereClauseBuilder.WriteString(col) From 70a81c93c956d9edd5f6b5be51ea5d21f235e0d7 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 6 Jun 2024 22:04:44 +0530 Subject: [PATCH 09/15] UI: fix dst table overwrite in QRep mirror creation (#1799) When the watermark/source table changed, we had logic to reset the destination table name to the watermark table name. This was done improperly, causing a desync between the displayed value to the user and the actual value in config that the mirror is created with. Fixed by removing the logic entirely, users should manually change the table if watermark table is updated after destination table. --- ui/app/mirrors/create/qrep/qrep.tsx | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 8488b5a766..ab5a0d4926 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -2,7 +2,6 @@ import SelectTheme from '@/app/styles/select'; import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; -import { DBType } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Switch } from '@/lib/Switch'; @@ -105,17 +104,6 @@ export default function QRepConfigForm({ ) => { if (val) { if (setting.label.includes('Table')) { - if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) { - setter((curr) => ({ - ...curr, - destinationTableIdentifier: val.split('.')[1], - })); - } else { - setter((curr) => ({ - ...curr, - destinationTableIdentifier: val, - })); - } loadColumnOptions(val); } handleChange(val, setting); From 9b8457f54f999546ff0deff314baa5869df44c64 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 6 Jun 2024 23:39:27 +0530 Subject: [PATCH 10/15] create mirror UI: make publication field clearable (#1798) Currently cannot cancel a dropdown selection of publication name, thus requiring reload --- ui/app/mirrors/create/cdc/fields.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/ui/app/mirrors/create/cdc/fields.tsx b/ui/app/mirrors/create/cdc/fields.tsx index 3a1f8064b2..1c4bb3ace0 100644 --- a/ui/app/mirrors/create/cdc/fields.tsx +++ b/ui/app/mirrors/create/cdc/fields.tsx @@ -78,6 +78,7 @@ const CDCField = ({ getOptionValue={(option) => option.option} theme={SelectTheme} isLoading={optionsLoading} + isClearable={true} /> {setting.tips && ( From 267a1bfbf36accd829ba0a5ee6d9e695cc0c4b4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 6 Jun 2024 20:36:28 +0000 Subject: [PATCH 11/15] PEERDB_QUEUE_FORCE_TOPIC_CREATION (#1790) Confluent cloud non-dedicated instances can't enable topic auto create, at the same time auto create on PubSub is a bad default since without subscriber data is lost Introduce environment variable to enable creating topics when topics don't exist, off by default --- .github/workflows/flow.yml | 3 ++- flow/connectors/kafka/kafka.go | 38 ++++++++++++++++++++++++++++---- flow/connectors/pubsub/pubsub.go | 18 ++++++++++----- flow/e2e/pubsub/pubsub_test.go | 1 - flow/go.mod | 1 + flow/go.sum | 2 ++ flow/peerdbenv/dynamicconf.go | 19 +++++++++++----- 7 files changed, 64 insertions(+), 18 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index fb1774a769..266492fb56 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -127,7 +127,7 @@ jobs: AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }} - ENABLE_SQLSERVER_TESTS: true + ENABLE_SQLSERVER_TESTS: "true" SQLSERVER_HOST: ${{ secrets.SQLSERVER_HOST }} SQLSERVER_PORT: ${{ secrets.SQLSERVER_PORT }} SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} @@ -138,4 +138,5 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres + PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true" ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200 diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index ec7decc50c..816c08c46d 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -3,11 +3,14 @@ package connkafka import ( "context" "crypto/tls" + "errors" "fmt" "log/slog" "sync/atomic" "time" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" @@ -40,7 +43,6 @@ func NewKafkaConnector( kgo.SeedBrokers(config.Servers...), kgo.AllowAutoTopicCreation(), kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx - kgo.SoftwareNameAndVersion("peerdb", peerdbenv.PeerDBVersionShaShort()), ) if !config.DisableTls { optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})) @@ -72,6 +74,11 @@ func NewKafkaConnector( return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl) } } + force, err := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if err == nil && force { + optionalOpts = append(optionalOpts, kgo.UnknownTopicRetries(0)) + } + client, err := kgo.NewClient(optionalOpts...) if err != nil { return nil, fmt.Errorf("failed to create kafka client: %w", err) @@ -197,13 +204,36 @@ func (c *KafkaConnector) createPool( recordCounter := atomic.Int32{} recordCounter.Store(lenRecords) for _, kr := range result.records { - c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) { + var handler func(*kgo.Record, error) + handler = func(_ *kgo.Record, err error) { if err != nil { - queueErr(err) + var success bool + if errors.Is(err, kerr.UnknownTopicOrPartition) { + force, envErr := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if envErr == nil && force { + c.logger.Info("[kafka] force topic creation", slog.String("topic", kr.Topic)) + _, err := kadm.NewClient(c.client).CreateTopic(ctx, 1, 3, nil, kr.Topic) + if err != nil && !errors.Is(err, kerr.TopicAlreadyExists) { + c.logger.Warn("[kafka] topic create error", slog.Any("error", err)) + queueErr(err) + return + } + success = true + } + } else { + c.logger.Warn("[kafka] produce error", slog.Any("error", err)) + } + if success { + time.Sleep(time.Second) // topic creation can take time to propagate, throttle + c.client.Produce(ctx, kr, handler) + } else { + queueErr(err) + } } else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil { shared.AtomicInt64Max(lastSeenLSN, result.lsn) } - }) + } + c.client.Produce(ctx, kr, handler) } } }) diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 7bfe41a5e2..011b4f512d 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -157,14 +157,20 @@ func (c *PubSubConnector) createPool( topicClient.EnableMessageOrdering = true } - exists, err := topicClient.Exists(ctx) - if err != nil { - return nil, fmt.Errorf("error checking if topic exists: %w", err) + force, envErr := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if envErr != nil { + return nil, envErr } - if !exists { - topicClient, err = c.client.CreateTopic(ctx, message.Topic) + if force { + exists, err := topicClient.Exists(ctx) if err != nil { - return nil, fmt.Errorf("error creating topic: %w", err) + return nil, fmt.Errorf("error checking if topic exists: %w", err) + } + if !exists { + topicClient, err = c.client.CreateTopic(ctx, message.Topic) + if err != nil { + return nil, fmt.Errorf("error creating topic: %w", err) + } } } return topicClient, nil diff --git a/flow/e2e/pubsub/pubsub_test.go b/flow/e2e/pubsub/pubsub_test.go index df6809b819..714acff0b3 100644 --- a/flow/e2e/pubsub/pubsub_test.go +++ b/flow/e2e/pubsub/pubsub_test.go @@ -159,7 +159,6 @@ func (s PubSubSuite) TestCreateTopic() { require.NoError(s.t, err) topic := psclient.Topic(flowName) exists, err := topic.Exists(context.Background()) - s.t.Log("WWWW exists", exists) require.NoError(s.t, err) return exists }) diff --git a/flow/go.mod b/flow/go.mod index c7109374af..f16ccf1f1c 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -45,6 +45,7 @@ require ( github.com/snowflakedb/gosnowflake v1.10.1 github.com/stretchr/testify v1.9.0 github.com/twmb/franz-go v1.17.0 + github.com/twmb/franz-go/pkg/kadm v1.12.0 github.com/twmb/franz-go/plugin/kslog v1.0.0 github.com/twpayne/go-geos v0.17.1 github.com/urfave/cli/v3 v3.0.0-alpha9 diff --git a/flow/go.sum b/flow/go.sum index 6ec593ec40..5b8b6c3a07 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -389,6 +389,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= +github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4= +github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= github.com/twmb/franz-go/plugin/kslog v1.0.0 h1:I64oEmF+0PDvmyLgwrlOtg4mfpSE9GwlcLxM4af2t60= diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index b43c7a6964..c90d1b8751 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -3,6 +3,7 @@ package peerdbenv import ( "context" "fmt" + "log/slog" "os" "strconv" "time" @@ -17,7 +18,7 @@ import ( func dynLookup(ctx context.Context, key string) (string, error) { conn, err := GetCatalogConnectionPoolFromEnv(ctx) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool", slog.Any("error", err)) return "", fmt.Errorf("failed to get catalog connection pool: %w", err) } @@ -31,7 +32,7 @@ func dynLookup(ctx context.Context, key string) (string, error) { return val, nil } } - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to get key", slog.Any("error", err)) return "", fmt.Errorf("failed to get key: %w", err) } if !value.Valid { @@ -57,7 +58,7 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T return strconv.ParseInt(value, 10, 64) }) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err)) return 0, fmt.Errorf("failed to parse as int64: %w", err) } @@ -69,8 +70,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string return strconv.ParseUint(value, 10, 64) }) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse as int64: %v", err) - return 0, fmt.Errorf("failed to parse as int64: %w", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err)) + return 0, fmt.Errorf("failed to parse as uint64: %w", err) } return T(value), nil @@ -79,7 +80,7 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string func dynamicConfBool(ctx context.Context, key string) (bool, error) { value, err := dynLookupConvert(ctx, key, strconv.ParseBool) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse bool: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err)) return false, fmt.Errorf("failed to parse bool: %w", err) } @@ -112,3 +113,9 @@ func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) (uint32, error) { func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) (bool, error) { return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") } + +// Kafka has topic auto create as an option, auto.create.topics.enable +// But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub +func PeerDBQueueForceTopicCreation(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_QUEUE_FORCE_TOPIC_CREATION") +} From 0d0cf3e343c493767ac3b90af5a0de6f2aaf18c1 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 7 Jun 2024 02:48:19 +0530 Subject: [PATCH 12/15] dynconf pt.2: more dynconf and UI improvements (#1796) --- flow/activities/flowable.go | 18 +++-- flow/activities/flowable_core.go | 6 +- flow/connectors/clickhouse/clickhouse.go | 5 +- .../connelasticsearch/elasticsearch.go | 9 ++- flow/connectors/eventhub/eventhub.go | 6 +- flow/connectors/kafka/kafka.go | 14 +++- flow/connectors/postgres/cdc.go | 5 +- flow/connectors/pubsub/pubsub.go | 14 +++- flow/connectors/snowflake/snowflake.go | 6 +- flow/connectors/utils/cdc_store.go | 17 +++-- flow/connectors/utils/cdc_store_test.go | 21 +++--- flow/connectors/utils/lua.go | 6 +- flow/peerdbenv/config.go | 53 --------------- flow/peerdbenv/dynamicconf.go | 44 +++++++++++++ flow/peerdbenv/env.go | 16 ----- flow/workflows/cdc_flow.go | 4 +- flow/workflows/local_activities.go | 25 +++++++ flow/workflows/normalize_flow.go | 5 +- flow/workflows/scheduled_flows.go | 21 ++---- flow/workflows/sync_flow.go | 5 +- .../migrations/V30__more_dynconf_settings.sql | 18 +++++ ui/app/settings/page.tsx | 66 ++++++++++++++++--- ui/tsconfig.json | 2 +- 23 files changed, 248 insertions(+), 138 deletions(-) create mode 100644 flow/workflows/local_activities.go create mode 100644 nexus/catalog/migrations/V30__more_dynconf_settings.sql diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 844c60c088..14517b09cd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -570,19 +570,27 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *prot func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { logger := activity.GetLogger(ctx) - if !peerdbenv.PeerDBEnableWALHeartbeat() { + walHeartbeatEnabled, err := peerdbenv.PeerDBEnableWALHeartbeat(ctx) + if err != nil { + logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err)) + return err + } + if !walHeartbeatEnabled { logger.Info("wal heartbeat is disabled") return nil } + walHeartbeatStatement, err := peerdbenv.PeerDBWALHeartbeatQuery(ctx) + if err != nil { + logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err)) + return err + } pgPeers, err := a.getPostgresPeerConfigs(ctx) if err != nil { - logger.Warn("[sendwalheartbeat] unable to fetch peers. " + - "Skipping walheartbeat send. Error: " + err.Error()) + logger.Warn("[sendwalheartbeat] unable to fetch peers. Skipping walheartbeat send.", slog.Any("error", err)) return err } - command := peerdbenv.PeerDBWALHeartbeatQuery() // run above command for each Postgres peer for _, pgPeer := range pgPeers { activity.RecordHeartbeat(ctx, pgPeer.Name) @@ -599,7 +607,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return } defer pgConn.Close() - cmdErr := pgConn.ExecuteCommand(ctx, command) + cmdErr := pgConn.ExecuteCommand(ctx, walHeartbeatStatement) if cmdErr != nil { logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, cmdErr)) } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 24a98bdca6..ce74f4c3da 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -118,7 +118,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon consumedOffset := atomic.Int64{} consumedOffset.Store(lastOffset) - recordBatchPull := model.NewCDCStream[Items](peerdbenv.PeerDBCDCChannelBufferSize()) + channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + } + recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize)) recordBatchSync := recordBatchPull if adaptStream != nil { var err error diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 18dc75e64b..e089c88b12 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -115,7 +115,10 @@ func NewClickhouseConnector( bucketPathSuffix := fmt.Sprintf("%s/%s", url.PathEscape(deploymentUID), url.PathEscape(flowName)) // Fallback: Get S3 credentials from environment - awsBucketName := peerdbenv.PeerDBClickhouseAWSS3BucketName() + awsBucketName, err := peerdbenv.PeerDBClickhouseAWSS3BucketName(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get PeerDB Clickhouse Bucket Name: %w", err) + } if awsBucketName == "" { return nil, errors.New("PeerDB Clickhouse Bucket Name not set") } diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index f9f6c1819f..e0ed2ef270 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -146,10 +146,13 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, defer cacheCloser() flushLoopDone := make(chan struct{}) - // we only update lastSeenLSN in the OnSuccess call, so this should be safe even if race - // between loop breaking and closing flushLoopDone go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + esc.logger.Warn("[elasticsearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 1182f4d413..0877f256f7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -184,7 +184,11 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get flush timeout: %w", err) + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() lastSeenLSN := int64(0) diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index 816c08c46d..1604927dcb 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -183,7 +183,12 @@ func (c *KafkaConnector) createPool( lastSeenLSN *atomic.Int64, queueErr func(error), ) (*utils.LPool[poolResult], error) { - return utils.LuaPool(func() (*lua.LState, error) { + maxSize, err := peerdbenv.PeerDBQueueParallelism(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get parallelism: %w", err) + } + + return utils.LuaPool(int(maxSize), func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) })) @@ -253,7 +258,12 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) flushLoopDone := make(chan struct{}) go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + c.logger.Warn("[kafka] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cfb1664bf5..1c39cf77bc 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -312,7 +312,10 @@ func PullCdcRecords[Items model.Items]( } var standByLastLogged time.Time - cdcRecordsStorage := utils.NewCDCStore[Items](p.flowJobName) + cdcRecordsStorage, err := utils.NewCDCStore[Items](ctx, p.flowJobName) + if err != nil { + return err + } defer func() { if cdcRecordsStorage.IsEmpty() { records.SignalAsEmpty() diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 011b4f512d..f097424dc9 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -138,7 +138,12 @@ func (c *PubSubConnector) createPool( publish chan<- publishResult, queueErr func(error), ) (*utils.LPool[poolResult], error) { - return utils.LuaPool(func() (*lua.LState, error) { + maxSize, err := peerdbenv.PeerDBQueueParallelism(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get parallelism: %w", err) + } + + return utils.LuaPool(int(maxSize), func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) })) @@ -268,7 +273,12 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord flushLoopDone := make(chan struct{}) go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + c.logger.Warn("[pubsub] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6a179591e3..58522b207c 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -535,7 +535,11 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) + mergeParallelism, err := peerdbenv.PeerDBSnowflakeMergeParallelism(ctx) + if err != nil { + return fmt.Errorf("failed to get merge parallelism: %w", err) + } + g.SetLimit(int(mergeParallelism)) mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(flowName), diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index a98f10c656..4de95db70f 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -2,6 +2,7 @@ package utils import ( "bytes" + "context" "encoding/gob" "errors" "fmt" @@ -43,16 +44,24 @@ type cdcStore[Items model.Items] struct { numRecordsSwitchThreshold int } -func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] { +func NewCDCStore[Items model.Items](ctx context.Context, flowJobName string) (*cdcStore[Items], error) { + numRecordsSwitchThreshold, err := peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC disk spill records threshold: %w", err) + } + memPercent, err := peerdbenv.PeerDBCDCDiskSpillMemPercentThreshold(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC disk spill memory percent threshold: %w", err) + } + return &cdcStore[Items]{ inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]), pebbleDB: nil, numRecords: atomic.Int32{}, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), - numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(), + numRecordsSwitchThreshold: int(numRecordsSwitchThreshold), memThresholdBytes: func() uint64 { - memPercent := peerdbenv.PeerDBCDCDiskSpillMemPercentThreshold() maxMemBytes := peerdbenv.PeerDBFlowWorkerMaxMemBytes() if memPercent > 0 && maxMemBytes > 0 { return maxMemBytes * uint64(memPercent) / 100 @@ -61,7 +70,7 @@ func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] { }(), thresholdReason: "", memStats: []metrics.Sample{{Name: "/memory/classes/heap/objects:bytes"}}, - } + }, nil } func init() { diff --git a/flow/connectors/utils/cdc_store_test.go b/flow/connectors/utils/cdc_store_test.go index ab6560f3d1..a278693dfb 100644 --- a/flow/connectors/utils/cdc_store_test.go +++ b/flow/connectors/utils/cdc_store_test.go @@ -1,6 +1,7 @@ package utils import ( + "context" "crypto/rand" "log/slog" "testing" @@ -67,11 +68,12 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record[model.RecordI func TestSingleRecord(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_single_record") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_single_record") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -87,7 +89,8 @@ func TestSingleRecord(t *testing.T) { func TestRecordsTillSpill(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_records_till_spill") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_records_till_spill") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit @@ -101,7 +104,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -118,11 +121,12 @@ func TestRecordsTillSpill(t *testing.T) { func TestTimeAndDecimalEncoding(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_time_encoding") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_time_encoding") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -139,11 +143,12 @@ func TestTimeAndDecimalEncoding(t *testing.T) { func TestNullKeyDoesntStore(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_time_encoding") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_time_encoding") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), model.TableWithPkey{}, rec) + err = cdcRecordsStore.Set(slog.Default(), model.TableWithPkey{}, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) diff --git a/flow/connectors/utils/lua.go b/flow/connectors/utils/lua.go index f1d82f373f..24c37394d7 100644 --- a/flow/connectors/utils/lua.go +++ b/flow/connectors/utils/lua.go @@ -5,12 +5,11 @@ import ( "fmt" "strings" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "github.com/PeerDB-io/gluaflatbuffers" "github.com/PeerDB-io/gluajson" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" ) @@ -109,8 +108,7 @@ type LPool[T any] struct { closed bool } -func LuaPool[T any](cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error) { - maxSize := peerdbenv.PeerDBQueueParallelism() +func LuaPool[T any](maxSize int, cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error) { returns := make(chan (<-chan T), maxSize) wait := make(chan struct{}) go func() { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index bcf1ac050e..6c7501b0a3 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,21 +29,6 @@ func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string { return fmt.Sprintf("%s-%s", deploymentUID, taskQueueID) } -// PEERDB_CDC_CHANNEL_BUFFER_SIZE -func PeerDBCDCChannelBufferSize() int { - return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) -} - -// PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS -func PeerDBQueueFlushTimeoutSeconds() time.Duration { - x := getEnvInt("PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", 10) - return time.Duration(x) * time.Second -} - -func PeerDBQueueParallelism() int { - return getEnvInt("PEERDB_QUEUE_PARALLELISM", 4) -} - // env variable doesn't exist anymore, but tests appear to depend on this // in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { @@ -56,16 +41,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) -} - -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) @@ -96,29 +71,6 @@ func PeerDBCatalogDatabase() string { return GetEnvString("PEERDB_CATALOG_DATABASE", "") } -// PEERDB_ENABLE_WAL_HEARTBEAT -func PeerDBEnableWALHeartbeat() bool { - return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) -} - -// PEERDB_WAL_HEARTBEAT_QUERY -func PeerDBWALHeartbeatQuery() string { - return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN; -DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); -CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); -DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); -END;`) -} - -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize() bool { - return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) -} - -func PeerDBSnowflakeMergeParallelism() int { - return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) -} - // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return GetEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") @@ -140,8 +92,3 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return GetEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } - -// PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME -func PeerDBClickhouseAWSS3BucketName() string { - return GetEnvString("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME", "") -} diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index c90d1b8751..928147d4ab 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -114,6 +114,50 @@ func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) (bool, error) return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") } +func PeerDBCDCChannelBufferSize(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +} + +func PeerDBQueueFlushTimeoutSeconds(ctx context.Context) (time.Duration, error) { + x, err := dynamicConfSigned[int64](ctx, "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS") + if err != nil { + return 0, err + } + return time.Duration(x) * time.Second, nil +} + +func PeerDBQueueParallelism(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_QUEUE_PARALLELISM") +} + +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD") +} + +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD") +} + +func PeerDBEnableWALHeartbeat(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_ENABLE_WAL_HEARTBEAT") +} + +func PeerDBWALHeartbeatQuery(ctx context.Context) (string, error) { + return dynLookup(ctx, "PEERDB_WAL_HEARTBEAT_QUERY") +} + +func PeerDBEnableParallelSyncNormalize(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE") +} + +func PeerDBSnowflakeMergeParallelism(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") +} + +func PeerDBClickhouseAWSS3BucketName(ctx context.Context) (string, error) { + return dynLookup(ctx, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") +} + // Kafka has topic auto create as an option, auto.create.topics.enable // But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub func PeerDBQueueForceTopicCreation(ctx context.Context) (bool, error) { diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index 27ef13cbc0..790fe84f67 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -41,22 +41,6 @@ func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { return T(i) } -// getEnvBool returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid value. -func getEnvBool(name string, defaultValue bool) bool { - val, ok := os.LookupEnv(name) - if !ok { - return defaultValue - } - - b, err := strconv.ParseBool(val) - if err != nil { - return defaultValue - } - - return b -} - // GetEnvString returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set. func GetEnvString(name string, defaultValue string) string { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 51ccaf7f5f..a01a2febc5 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -502,9 +502,7 @@ func CDCFlowWorkflow( maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping) }) - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.Drain() diff --git a/flow/workflows/local_activities.go b/flow/workflows/local_activities.go new file mode 100644 index 0000000000..c5a25c8ea4 --- /dev/null +++ b/flow/workflows/local_activities.go @@ -0,0 +1,25 @@ +package peerflow + +import ( + "log/slog" + "time" + + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func getParallelSyncNormalize(wCtx workflow.Context, logger log.Logger) bool { + checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ + StartToCloseTimeout: time.Minute, + }) + + getParallelFuture := workflow.ExecuteLocalActivity(checkCtx, peerdbenv.PeerDBEnableParallelSyncNormalize) + var parallel bool + if err := getParallelFuture.Get(checkCtx, ¶llel); err != nil { + logger.Warn("Failed to get status of parallel sync-normalize", slog.Any("error", err)) + return false + } + return parallel +} diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index e4ed09e072..0fa039e509 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -9,7 +9,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -108,9 +107,7 @@ func NormalizeFlowWorkflow( } if ctx.Err() == nil && !state.Stop { - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { _ = model.NormalizeDoneSignal.SignalExternalWorkflow( diff --git a/flow/workflows/scheduled_flows.go b/flow/workflows/scheduled_flows.go index 61db3d0262..7aae7d7c70 100644 --- a/flow/workflows/scheduled_flows.go +++ b/flow/workflows/scheduled_flows.go @@ -5,8 +5,6 @@ import ( "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/workflow" - - "github.com/PeerDB-io/peer-flow/peerdbenv" ) // RecordSlotSizeWorkflow monitors replication slot size @@ -47,18 +45,13 @@ func withCronOptions(ctx workflow.Context, workflowID string, cron string) workf func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { info := workflow.GetInfo(ctx) - walHeartbeatEnabled := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableWALHeartbeat() - }) - if walHeartbeatEnabled { - heartbeatCtx := withCronOptions(ctx, - "wal-heartbeat-"+info.OriginalRunID, - "*/12 * * * *") - workflow.ExecuteChildWorkflow( - heartbeatCtx, - HeartbeatFlowWorkflow, - ) - } + heartbeatCtx := withCronOptions(ctx, + "wal-heartbeat-"+info.OriginalRunID, + "*/12 * * * *") + workflow.ExecuteChildWorkflow( + heartbeatCtx, + HeartbeatFlowWorkflow, + ) slotSizeCtx := withCronOptions(ctx, "record-slot-size-"+info.OriginalRunID, diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 711928ef5c..f126b721d2 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -11,7 +11,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -77,9 +76,7 @@ func SyncFlowWorkflow( }) var waitSelector workflow.Selector - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) diff --git a/nexus/catalog/migrations/V30__more_dynconf_settings.sql b/nexus/catalog/migrations/V30__more_dynconf_settings.sql new file mode 100644 index 0000000000..71656d8922 --- /dev/null +++ b/nexus/catalog/migrations/V30__more_dynconf_settings.sql @@ -0,0 +1,18 @@ +INSERT INTO dynamic_settings (config_name,config_default_value,config_value_type,config_description,config_apply_mode) +VALUES +('PEERDB_CDC_CHANNEL_BUFFER_SIZE','262144',2,'Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC',1), +('PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS','10',2,'Frequency of flushing to queue, applicable for PeerDB Streams mirrors only',1), +('PEERDB_QUEUE_PARALLELISM','4',2,'Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub',1), +('PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD','1000000',2,'CDC: number of records beyond which records are written to disk instead',1), +('PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD','-1',2,'CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables',1), +('PEERDB_ENABLE_WAL_HEARTBEAT','false',4,'enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity',1), +('PEERDB_WAL_HEARTBEAT_QUERY','BEGIN; +DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); +CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); +DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); +END;',1,'SQL statement to run during each WAL heartbeat',1), +('PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE','false',4,'Advanced setting: enables experimental parallel sync (moving rows to target) and normalize (updating rows in target table)',2), +('PEERDB_SNOWFLAKE_MERGE_PARALLELISM','8',2,'Number of MERGE statements to run in parallel, applies to CDC mirrors with Snowflake targets. -1 means no limit',1), +('PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME','',1,'S3 buckets to store Avro files for mirrors with ClickHouse target',1), +('PEERDB_QUEUE_FORCE_TOPIC_CREATION','false',4,'Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors',4) + ON CONFLICT DO NOTHING; diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx index 18af3ff63f..e84ddd31f2 100644 --- a/ui/app/settings/page.tsx +++ b/ui/app/settings/page.tsx @@ -1,6 +1,6 @@ 'use client'; -import { DynconfApplyMode } from '@/grpc_generated/flow'; +import { DynconfApplyMode, DynconfValueType } from '@/grpc_generated/flow'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; @@ -11,6 +11,9 @@ import { Tooltip } from '@/lib/Tooltip'; import { dynamic_settings } from '@prisma/client'; import { MaterialSymbol } from 'material-symbols'; import { useEffect, useMemo, useState } from 'react'; +import { ToastContainer } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; +import { notifyErr } from '../utils/notify'; const ROWS_PER_PAGE = 7; @@ -65,7 +68,48 @@ const DynamicSettingItem = ({ setEditMode(true); }; + const validateNewValue = (): boolean => { + const notNullValue = newValue ?? ''; + if (setting.config_value_type === DynconfValueType.INT) { + const a = parseInt(Number(notNullValue).toString()); + if ( + isNaN(a) || + a > Number.MAX_SAFE_INTEGER || + a < Number.MIN_SAFE_INTEGER + ) { + notifyErr('Invalid value. Please enter a valid 64-bit signed integer.'); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.UINT) { + const a = parseInt(Number(notNullValue).toString()); + if (isNaN(a) || a > Number.MAX_SAFE_INTEGER || a < 0) { + notifyErr( + 'Invalid value. Please enter a valid 64-bit unsigned integer.' + ); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.BOOL) { + if (notNullValue !== 'true' && notNullValue !== 'false') { + notifyErr('Invalid value. Please enter true or false.'); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.STRING) { + return true; + } else { + notifyErr('Invalid value type'); + return false; + } + }; + const handleSave = async () => { + if (!validateNewValue() || newValue === setting.config_value) { + setNewValue(setting.config_value); + setEditMode(false); + return; + } const updatedSetting = { ...setting, config_value: newValue }; await fetch('/api/settings', { method: 'POST', @@ -80,14 +124,14 @@ const DynamicSettingItem = ({ return ( - + {editMode ? (
setNewValue(e.target.value)} variant='simple' /> @@ -104,10 +148,10 @@ const DynamicSettingItem = ({
)}
- + {setting.config_default_value || 'N/A'} - + {setting.config_description || 'N/A'} @@ -134,12 +178,13 @@ const SettingsPage = () => { fetchSettings(); }, []); - const totalPages = Math.ceil(settings.length / ROWS_PER_PAGE); - - const displayedSettings = useMemo(() => { - const filteredSettings = settings.filter((setting) => + const filteredSettings = useMemo(() => { + return settings.filter((setting) => setting.config_name.toLowerCase().includes(searchQuery.toLowerCase()) ); + }, [settings, searchQuery]); + const totalPages = Math.ceil(filteredSettings.length / ROWS_PER_PAGE); + const displayedSettings = useMemo(() => { filteredSettings.sort((a, b) => { const aValue = a[sortField]; const bValue = b[sortField]; @@ -152,7 +197,7 @@ const SettingsPage = () => { const startRow = (currentPage - 1) * ROWS_PER_PAGE; const endRow = startRow + ROWS_PER_PAGE; return filteredSettings.slice(startRow, endRow); - }, [settings, currentPage, searchQuery, sortField, sortDir]); + }, [filteredSettings, currentPage, sortDir]); const handlePrevPage = () => { if (currentPage > 1) setCurrentPage(currentPage - 1); @@ -228,6 +273,7 @@ const SettingsPage = () => { /> ))} + ); }; diff --git a/ui/tsconfig.json b/ui/tsconfig.json index c443fefcce..46009ed0ae 100644 --- a/ui/tsconfig.json +++ b/ui/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "es6", + "target": "ES2020", "lib": ["dom", "dom.iterable", "esnext"], "allowJs": true, "skipLibCheck": true, From 1b9760c57a818dce16eeb97e77a3c415e86548fa Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 7 Jun 2024 03:16:02 +0530 Subject: [PATCH 13/15] minor docker fixes (#1800) 1. `version` is apparently obsolete. 2. minio to latest, complains after a while otherwise --- docker-compose-dev.yml | 2 -- docker-compose.yml | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 1868c755bf..c62fa71863 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -1,5 +1,3 @@ -version: "3.9" - name: peerdb-quickstart-dev x-minio-config: &minio-config diff --git a/docker-compose.yml b/docker-compose.yml index 1b1617233d..75b48cf53a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - name: peerdb-quickstart x-minio-config: &minio-config @@ -175,7 +173,7 @@ services: - flow-api minio: - image: minio/minio + image: minio/minio:latest restart: unless-stopped volumes: - minio-data:/data From 1f5e566ca44ac5bb34a2dec3ea53ec4ec3b3fd9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 6 Jun 2024 22:16:56 +0000 Subject: [PATCH 14/15] Alpine 3.20 (#1801) --- flow/go.mod | 2 +- stacks/flow.Dockerfile | 2 +- stacks/peerdb-server.Dockerfile | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/go.mod b/flow/go.mod index f16ccf1f1c..abba2c42f2 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22.3 +go 1.22.4 require ( cloud.google.com/go v0.114.0 diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 60b164b93f..945cadf6c3 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -18,7 +18,7 @@ WORKDIR /root/flow ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow -FROM alpine:3.19 AS flow-base +FROM alpine:3.20 AS flow-base RUN apk add --no-cache ca-certificates geos && \ adduser -s /bin/sh -D peerdb USER peerdb diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index ac85f63dce..c8acd0c5ed 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 -FROM lukemathwalker/cargo-chef:latest-rust-alpine3.19 as chef +FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20 as chef WORKDIR /root FROM chef as planner @@ -21,7 +21,7 @@ COPY protos /root/protos WORKDIR /root/nexus RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerdb-server -FROM alpine:3.19 +FROM alpine:3.20 RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \ adduser -s /bin/sh -D peerdb && \ install -d -m 0755 -o peerdb /var/log/peerdb From 12687b431b248a783f1284c56b0a6047308db517 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 7 Jun 2024 01:08:58 +0000 Subject: [PATCH 15/15] Remove tokio unstable flag (#1802) Was making it awkward to build individual crates, & has less value as most of product is golang now --- nexus/.cargo/config.toml | 2 - nexus/Cargo.lock | 197 ++++++--------------------- nexus/postgres-connection/src/lib.rs | 12 +- nexus/server/Cargo.toml | 1 - nexus/server/src/main.rs | 94 ++++++------- 5 files changed, 88 insertions(+), 218 deletions(-) delete mode 100644 nexus/.cargo/config.toml diff --git a/nexus/.cargo/config.toml b/nexus/.cargo/config.toml deleted file mode 100644 index bff29e6e17..0000000000 --- a/nexus/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index f1d56e39ba..b0b3b18d59 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -127,9 +127,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" dependencies = [ "windows-sys 0.52.0", ] @@ -244,7 +244,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "itoa", "matchit", "memchr", @@ -590,9 +590,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.4" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7" dependencies = [ "clap_builder", "clap_derive", @@ -600,9 +600,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.2" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df" dependencies = [ "anstream", "anstyle", @@ -612,9 +612,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.4" +version = "4.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -624,9 +624,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" [[package]] name = "cmake" @@ -643,43 +643,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" -[[package]] -name = "console-api" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" -dependencies = [ - "futures-core", - "prost", - "prost-types", - "tonic 0.10.2", - "tracing-core", -] - -[[package]] -name = "console-subscriber" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" -dependencies = [ - "console-api", - "crossbeam-channel", - "crossbeam-utils", - "futures-task", - "hdrhistogram", - "humantime", - "prost-types", - "serde", - "serde_json", - "thread_local", - "tokio", - "tokio-stream", - "tonic 0.10.2", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -1207,19 +1170,6 @@ dependencies = [ "allocator-api2", ] -[[package]] -name = "hdrhistogram" -version = "7.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" -dependencies = [ - "base64 0.21.7", - "byteorder", - "flate2", - "nom", - "num-traits", -] - [[package]] name = "heck" version = "0.4.1" @@ -1321,17 +1271,11 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -1378,7 +1322,7 @@ checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.28", + "hyper 0.14.29", "log", "rustls 0.22.4", "rustls-native-certs", @@ -1410,7 +1354,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.28", + "hyper 0.14.29", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -1662,15 +1606,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "matchit" version = "0.7.3" @@ -2181,7 +2116,6 @@ dependencies = [ "cargo-deb", "catalog", "clap", - "console-subscriber", "dashmap", "dotenvy", "flow-rs", @@ -2382,7 +2316,7 @@ version = "0.1.0" dependencies = [ "anyhow", "pt", - "rustls 0.23.8", + "rustls 0.23.9", "tokio", "tokio-postgres", "tokio-postgres-rustls", @@ -2562,7 +2496,7 @@ dependencies = [ "serde", "serde_json", "sqlparser", - "tonic 0.11.0", + "tonic", "tonic-reflection", ] @@ -2726,17 +2660,8 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -2747,15 +2672,9 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.3", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.3" @@ -2936,9 +2855,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.8" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79adb16721f56eb2d843e67676896a61ce7a0fa622dc18d3e372477a029d2740" +checksum = "a218f0f6d05669de4eabfb24f31ce802035c952429d037507b4a4a39f0e60c5b" dependencies = [ "log", "once_cell", @@ -3410,9 +3329,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tar" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" dependencies = [ "filetime", "libc", @@ -3524,7 +3443,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.48.0", ] @@ -3582,7 +3500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab" dependencies = [ "ring", - "rustls 0.23.8", + "rustls 0.23.9", "tokio", "tokio-postgres", "tokio-rustls 0.26.0", @@ -3606,7 +3524,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.8", + "rustls 0.23.9", "rustls-pki-types", "tokio", ] @@ -3637,14 +3555,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.13" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4e43f8cc456c9704c851ae29c67e17ef65d2c30017c17a9765b89c382dc8bba" +checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.13", + "toml_edit 0.22.14", ] [[package]] @@ -3669,42 +3587,15 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.13" +version = "0.22.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c" +checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" dependencies = [ "indexmap 2.2.6", "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.9", -] - -[[package]] -name = "tonic" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.21.7", - "bytes", - "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "tokio", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", + "winnow 0.6.13", ] [[package]] @@ -3721,7 +3612,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-timeout", "percent-encoding", "pin-project", @@ -3744,7 +3635,7 @@ dependencies = [ "prost", "tokio", "tokio-stream", - "tonic 0.11.0", + "tonic", ] [[package]] @@ -3757,7 +3648,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic 0.11.0", + "tonic", ] [[package]] @@ -3853,14 +3744,10 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "matchers", "nu-ansi-term", - "once_cell", - "regex", "sharded-slab", "smallvec", "thread_local", - "tracing", "tracing-core", "tracing-log", ] @@ -3917,9 +3804,9 @@ checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "untrusted" @@ -4129,9 +4016,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" dependencies = [ "rustls-pki-types", ] @@ -4337,9 +4224,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.9" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c949fede1d13936a99f14fafd3e76fd642b556dd2ce96287fbe2e0151bfac6" +checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" dependencies = [ "memchr", ] @@ -4402,7 +4289,7 @@ dependencies = [ "base64 0.21.7", "futures", "http 0.2.12", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-rustls 0.25.0", "itertools 0.12.1", "log", diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 4328978adf..7b2591687a 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -91,13 +91,11 @@ pub async fn connect_postgres(config: &PostgresConfig) -> anyhow::Result TracerGuards { - let console_layer = console_subscriber::spawn(); - // also log to peerdb.log in log_dir let file_appender = tracing_appender::rolling::never(log_dir, "peerdb.log"); let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); @@ -1145,16 +1143,9 @@ fn setup_tracing(log_dir: &str) -> TracerGuards { let fmt_stdout_layer = fmt::layer().with_target(false).with_writer(std::io::stdout); - // add min tracing as info - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) - .unwrap(); - tracing_subscriber::registry() - .with(console_layer) .with(fmt_stdout_layer) .with(fmt_file_layer) - .with(filter_layer) .init(); // return guard so that the file appender is not dropped @@ -1225,55 +1216,52 @@ pub async fn main() -> anyhow::Result<()> { let (mut socket, _) = tokio::select! { _ = sigintstream.recv() => return Ok(()), v = listener.accept() => v, - } - .unwrap(); + }?; let conn_flow_handler = flow_handler.clone(); let conn_peer_conns = peer_conns.clone(); let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; let authenticator_ref = authenticator.make(); let pg_config = catalog_config.to_postgres_config(); - tokio::task::Builder::new() - .name("tcp connection handler") - .spawn(async move { - match Catalog::new(pg_config).await { - Ok(catalog) => { - let conn_uuid = uuid::Uuid::new_v4(); - let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); - - let processor = Arc::new(NexusBackend::new( - Arc::new(catalog), - tracker, - conn_flow_handler, - peerdb_fdw_mode, - )); - process_socket( - socket, - None, - authenticator_ref, - processor.clone(), - processor, - ) - .await - } - Err(e) => { - tracing::error!("Failed to connect to catalog: {}", e); - - let mut buf = BytesMut::with_capacity(1024); - buf.put_u8(b'E'); - buf.put_i32(0); - buf.put(&b"FATAL"[..]); - buf.put_u8(0); - write!(buf, "Failed to connect to catalog: {e}").ok(); - buf.put_u8(0); - buf.put_u8(b'\0'); - - socket.write_all(&buf).await?; - socket.shutdown().await?; - - Ok(()) - } + tokio::task::spawn(async move { + match Catalog::new(pg_config).await { + Ok(catalog) => { + let conn_uuid = uuid::Uuid::new_v4(); + let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); + + let processor = Arc::new(NexusBackend::new( + Arc::new(catalog), + tracker, + conn_flow_handler, + peerdb_fdw_mode, + )); + process_socket( + socket, + None, + authenticator_ref, + processor.clone(), + processor, + ) + .await } - })?; + Err(e) => { + tracing::error!("Failed to connect to catalog: {}", e); + + let mut buf = BytesMut::with_capacity(1024); + buf.put_u8(b'E'); + buf.put_i32(0); + buf.put(&b"FATAL"[..]); + buf.put_u8(0); + write!(buf, "Failed to connect to catalog: {e}").ok(); + buf.put_u8(0); + buf.put_u8(b'\0'); + + socket.write_all(&buf).await?; + socket.shutdown().await?; + + Ok(()) + } + } + }); } }