Skip to content

Commit

Permalink
chore: remove the v2 table from query-service
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed May 17, 2024
1 parent 6c1b236 commit 2e746bd
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 257 deletions.
41 changes: 26 additions & 15 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ const (
signozTraceTableName = "distributed_signoz_index_v2"
signozTraceLocalTableName = "signoz_index_v2"
signozMetricDBName = "signoz_metrics"
signozSampleLocalTableName = "samples_v2"
signozSampleTableName = "distributed_samples_v2"
signozTSTableName = "distributed_time_series_v2"
signozTSTableNameV4 = "distributed_time_series_v4"
signozTSTableNameV41Day = "distributed_time_series_v4_1day"
signozSampleLocalTableName = "samples_v4"
signozSampleTableName = "distributed_samples_v4"

signozTSLocalTableNameV4 = "time_series_v4"
signozTSTableNameV4 = "distributed_time_series_v4"

signozTSLocalTableNameV46Hrs = "time_series_v4_6hrs"
signozTSTableNameV46Hrs = "distributed_time_series_v4_6hrs"

signozTSLocalTableNameV41Day = "time_series_v4_1day"
signozTSTableNameV41Day = "distributed_time_series_v4_1day"

minTimespanForProgressiveSearch = time.Hour
minTimespanForProgressiveSearchMargin = time.Minute
Expand Down Expand Up @@ -2382,15 +2388,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}

case constants.MetricsTTL:
tableName := signozMetricDBName + "." + signozSampleLocalTableName
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
tableNames := []string{signozMetricDBName + "." + signozSampleLocalTableName, signozMetricDBName + "." + signozTSLocalTableNameV4, signozMetricDBName + "." + signozTSLocalTableNameV46Hrs, signozMetricDBName + "." + signozTSLocalTableNameV41Day}
for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
go func(tableName string) {
metricTTL := func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr))
Expand Down Expand Up @@ -2434,7 +2442,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}(tableName)
}
for _, tableName := range tableNames {
go metricTTL(tableName)
}
case constants.LogsTTL:
tableName := r.logsDB + "." + r.logsLocalTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
Expand Down Expand Up @@ -3259,7 +3270,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s

func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {

queryStr := fmt.Sprintf("SELECT count() as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)

rows, _ := r.db.Query(ctx, queryStr)

Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v3/cumulative_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func stepForTableCumulative(start, end int64) int64 {
return int64(step)
}

func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) {

step := stepForTableCumulative(start, end)

Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v3/cumulative_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestPanelTableForCumulative(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v4")
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query)
if err != nil {
t.Fatalf("unexpected error: %v\n", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)

func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {

metricQueryGroupBy := mq.GroupBy

Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v3/delta_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)

func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) {

// round up to the nearest multiple of 60
step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60)
Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v3/delta_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestPanelTableForDelta(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v4")
query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
96 changes: 5 additions & 91 deletions pkg/query-service/app/metrics/v3/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,93 +52,7 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
var rateWithoutNegative = `If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) `

// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
// timeseries based on search criteria
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) {
metricName := mq.AggregateAttribute.Key
aggregateOperator := mq.AggregateOperator
var conditions []string
if mq.Temporality == v3.Delta {
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta))
} else {
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified))
}

if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
// if the received value is an array for like/match op, just take the first value
// or should we throw an error?
if op == v3.FilterOperatorLike || op == v3.FilterOperatorRegex || op == v3.FilterOperatorNotLike || op == v3.FilterOperatorNotRegex {
x, ok := item.Value.([]interface{})
if ok {
if len(x) == 0 {
continue
}
toFormat = x[0]
}
}

if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
case v3.FilterOperatorIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLike:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotLike:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorRegex:
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorContains:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotContains:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorExists:
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
case v3.FilterOperatorNotExists:
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
default:
return "", fmt.Errorf("unsupported operation")
}
}
}
queryString := strings.Join(conditions, " AND ")

var selectLabels string
if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate {
selectLabels = "labels,"
} else {
for _, tag := range groupTags {
selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag.Key, tag.Key)
}
}

filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)

return filterSubQuery, nil
}

func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {

metricQueryGroupBy := mq.GroupBy

Expand Down Expand Up @@ -435,15 +349,15 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
var err error
if mq.Temporality == v3.Delta {
if panelType == v3.PanelTypeTable {
query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq)
} else {
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq)
}
} else {
if panelType == v3.PanelTypeTable {
query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq)
} else {
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
query, err = buildMetricQuery(start, end, mq.StepInterval, mq)
}
}

Expand Down
143 changes: 0 additions & 143 deletions pkg/query-service/app/metrics/v3/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,149 +102,6 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
})
}

func TestBuildQueryOperators(t *testing.T) {
testCases := []struct {
operator v3.FilterOperator
filterSet v3.FilterSet
expectedWhereClause string
}{
{
operator: v3.FilterOperatorEqual,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorEqual},
},
},
expectedWhereClause: "JSONExtractString(labels, 'service_name') = 'route'",
},
{
operator: v3.FilterOperatorNotEqual,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorNotEqual},
},
},
expectedWhereClause: "JSONExtractString(labels, 'service_name') != 'route'",
},
{
operator: v3.FilterOperatorRegex,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorRegex},
},
},
expectedWhereClause: "match(JSONExtractString(labels, 'service_name'), 'out')",
},
{
operator: v3.FilterOperatorNotRegex,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorNotRegex},
},
},
expectedWhereClause: "not match(JSONExtractString(labels, 'service_name'), 'out')",
},
{
operator: v3.FilterOperatorIn,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorIn},
},
},
expectedWhereClause: "JSONExtractString(labels, 'service_name') IN ['route','driver']",
},
{
operator: v3.FilterOperatorNotIn,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorNotIn},
},
},
expectedWhereClause: "JSONExtractString(labels, 'service_name') NOT IN ['route','driver']",
},
{
operator: v3.FilterOperatorExists,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorExists},
},
},
expectedWhereClause: "has(JSONExtractKeys(labels), 'horn')",
},
{
operator: v3.FilterOperatorNotExists,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorNotExists},
},
},
expectedWhereClause: "not has(JSONExtractKeys(labels), 'horn')",
},
{
operator: v3.FilterOperatorContains,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorContains},
},
},
expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), '%out%')",
},
{
operator: v3.FilterOperatorNotContains,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "serice_name"}, Value: "out", Operator: v3.FilterOperatorNotContains},
},
},
expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), '%out%')",
},
{
operator: v3.FilterOperatorLike,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service_name"}, Value: "dri", Operator: v3.FilterOperatorLike},
},
},
expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), 'dri')",
},
{
operator: v3.FilterOperatorNotLike,
filterSet: v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "serice_name"}, Value: "dri", Operator: v3.FilterOperatorNotLike},
},
},
expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), 'dri')",
},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
mq := v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"},
AggregateOperator: v3.AggregateOperatorSum,
}
whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq)
require.NoError(t, err)
require.Contains(t, whereClause, tc.expectedWhereClause)
})
}
}

func TestBuildQueryXRate(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) {

Expand Down
3 changes: 0 additions & 3 deletions pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,9 @@ var GroupByColMap = map[string]struct{}{

const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2"
SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2"
SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
Expand Down

0 comments on commit 2e746bd

Please sign in to comment.