Skip to content

Commit

Permalink
try to fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 20, 2024
1 parent fcd5f59 commit b1b106f
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,35 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b
return exists.Bool
}

func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T {
func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string, defaultValue T) T {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Text
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

result, err := strconv.ParseInt(value.String, 10, 32)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err)
return defaultValue
}

return T(result)
}

func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string, defaultValue T) T {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
Expand Down Expand Up @@ -55,46 +83,46 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 {
return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)
why := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)
return time.Duration(why) * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context) int {
return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))
return dynamicConfSigned(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 {
return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 {
return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize(ctx context.Context) int {
return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))
return dynamicConfSigned(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration {
x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
x := dynamicConfSigned[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 {
return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0)
func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint32 {
return dynamicConfUnsigned[uint32](ctx, "GOMEMLIMIT", 0)
}

0 comments on commit b1b106f

Please sign in to comment.