Skip to content

Commit

Permalink
Add index mode to esql index source (#108747)
Browse files Browse the repository at this point in the history
This PR adds index mode to the ESQL index source. Specifically, METRICS 
commands will now run with TimeSeriesSortedSourceOperatorFactory. Also,
it removes the time-series pragmas in favor of METRICS commands.
  • Loading branch information
dnhatn committed May 23, 2024
1 parent 7f35f1b commit 4b50858
Show file tree
Hide file tree
Showing 22 changed files with 247 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SIMULATE_VALIDATES_MAPPINGS = def(8_665_00_0);
public static final TransportVersion RULE_QUERY_RENAME = def(8_666_00_0);
public static final TransportVersion SPARSE_VECTOR_QUERY_ADDED = def(8_667_00_0);
public static final TransportVersion ESQL_ADD_INDEX_MODE_TO_SOURCE = def(8_668_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -24,9 +23,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
assumeTrue("timseries requires pragmas", canUseQueryPragmas());
var settings = Settings.builder().put(request.pragmas().getSettings()).put(QueryPragmas.TIME_SERIES_MODE.getKey(), "true").build();
request.pragmas(new QueryPragmas(settings));
assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot());
return super.run(request);
}

Expand Down Expand Up @@ -65,16 +62,17 @@ public void testSimpleMetrics() {
.get();
List<String> pods = List.of("p1", "p2", "p3");
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z");
int numDocs = between(10, 10);
Map<String, List<Integer>> cpus = new HashMap<>();
int numDocs = between(10, 100);
record Doc(String pod, long timestamp, double cpu) {}
List<Doc> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
String pod = randomFrom(pods);
int cpu = randomIntBetween(0, 100);
cpus.computeIfAbsent(pod, k -> new ArrayList<>()).add(cpu);
long timestamp = startTime + (1000L * i);
docs.add(new Doc(pod, timestamp, cpu));
client().prepareIndex("pods").setSource("@timestamp", timestamp, "pod", pod, "cpu", cpu).get();
}
List<String> sortedGroups = cpus.keySet().stream().sorted().toList();
List<String> sortedGroups = docs.stream().map(d -> d.pod).distinct().sorted().toList();
client().admin().indices().prepareRefresh("pods").get();
try (EsqlQueryResponse resp = run("METRICS pods load=avg(cpu) BY pod | SORT pod")) {
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
Expand All @@ -83,10 +81,24 @@ public void testSimpleMetrics() {
List<Object> r = rows.get(i);
String pod = (String) r.get(1);
assertThat(pod, equalTo(sortedGroups.get(i)));
List<Integer> values = cpus.get(pod);
List<Double> values = docs.stream().filter(d -> d.pod.equals(pod)).map(d -> d.cpu).toList();
double avg = values.stream().mapToDouble(n -> n).sum() / values.size();
assertThat((double) r.get(0), equalTo(avg));
}
}
try (EsqlQueryResponse resp = run("METRICS pods | SORT @timestamp DESC | KEEP @timestamp, pod, cpu | LIMIT 5")) {
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
List<Doc> topDocs = docs.stream().sorted(Comparator.comparingLong(Doc::timestamp).reversed()).limit(5).toList();
assertThat(rows, hasSize(topDocs.size()));
for (int i = 0; i < rows.size(); i++) {
List<Object> r = rows.get(i);
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis((String) r.get(0));
String pod = (String) r.get(1);
double cpu = (Double) r.get(2);
assertThat(topDocs.get(i).timestamp, equalTo(timestamp));
assertThat(topDocs.get(i).pod, equalTo(pod));
assertThat(topDocs.get(i).cpu, equalTo(cpu));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ protected LogicalPlan rule(EsqlUnresolvedRelation plan, AnalyzerContext context)
if (context.indexResolution().isValid() == false) {
return plan.unresolvedMessage().equals(context.indexResolution().toString())
? plan
: new EsqlUnresolvedRelation(plan.source(), plan.table(), plan.metadataFields(), context.indexResolution().toString());
: new EsqlUnresolvedRelation(
plan.source(),
plan.table(),
plan.metadataFields(),
plan.indexMode(),
context.indexResolution().toString()
);
}
TableIdentifier table = plan.table();
if (context.indexResolution().matches(table.index()) == false) {
Expand All @@ -171,14 +177,15 @@ protected LogicalPlan rule(EsqlUnresolvedRelation plan, AnalyzerContext context)
plan.source(),
plan.table(),
plan.metadataFields(),
plan.indexMode(),
"invalid [" + table + "] resolution to [" + context.indexResolution() + "]"
);
}

EsIndex esIndex = context.indexResolution().get();
var attributes = mappingAsAttributes(plan.source(), esIndex.mapping());
attributes.addAll(plan.metadataFields());
return new EsRelation(plan.source(), esIndex, attributes.isEmpty() ? NO_FIELDS : attributes);
return new EsRelation(plan.source(), esIndex, attributes.isEmpty() ? NO_FIELDS : attributes, plan.indexMode());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
Expand Down Expand Up @@ -491,6 +492,7 @@ static EsQueryExec readEsQueryExec(PlanStreamInput in) throws IOException {
return new EsQueryExec(
in.readSource(),
readEsIndex(in),
readIndexMode(in),
readAttributes(in),
in.readOptionalNamedWriteable(QueryBuilder.class),
in.readOptionalNamed(Expression.class),
Expand All @@ -503,6 +505,7 @@ static void writeEsQueryExec(PlanStreamOutput out, EsQueryExec esQueryExec) thro
assert esQueryExec.children().size() == 0;
out.writeNoSource();
writeEsIndex(out, esQueryExec.index());
writeIndexMode(out, esQueryExec.indexMode());
writeAttributes(out, esQueryExec.output());
out.writeOptionalNamedWriteable(esQueryExec.query());
out.writeOptionalExpression(esQueryExec.limit());
Expand All @@ -511,14 +514,37 @@ static void writeEsQueryExec(PlanStreamOutput out, EsQueryExec esQueryExec) thro
}

static EsSourceExec readEsSourceExec(PlanStreamInput in) throws IOException {
return new EsSourceExec(in.readSource(), readEsIndex(in), readAttributes(in), in.readOptionalNamedWriteable(QueryBuilder.class));
return new EsSourceExec(
in.readSource(),
readEsIndex(in),
readAttributes(in),
in.readOptionalNamedWriteable(QueryBuilder.class),
readIndexMode(in)
);
}

static void writeEsSourceExec(PlanStreamOutput out, EsSourceExec esSourceExec) throws IOException {
out.writeNoSource();
writeEsIndex(out, esSourceExec.index());
writeAttributes(out, esSourceExec.output());
out.writeOptionalNamedWriteable(esSourceExec.query());
writeIndexMode(out, esSourceExec.indexMode());
}

static IndexMode readIndexMode(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
return IndexMode.fromString(in.readString());
} else {
return IndexMode.STANDARD;
}
}

static void writeIndexMode(StreamOutput out, IndexMode indexMode) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
out.writeString(indexMode.getName());
} else if (indexMode != IndexMode.STANDARD) {
throw new IllegalStateException("not ready to support index mode [" + indexMode + "]");
}
}

static EvalExec readEvalExec(PlanStreamInput in) throws IOException {
Expand Down Expand Up @@ -799,8 +825,9 @@ static EsRelation readEsRelation(PlanStreamInput in) throws IOException {
if (supportingEsSourceOptions(in.getTransportVersion())) {
readEsSourceOptions(in); // consume optional strings sent by remote
}
final IndexMode indexMode = readIndexMode(in);
boolean frozen = in.readBoolean();
return new EsRelation(source, esIndex, attributes, frozen);
return new EsRelation(source, esIndex, attributes, indexMode, frozen);
}

static void writeEsRelation(PlanStreamOutput out, EsRelation relation) throws IOException {
Expand All @@ -811,6 +838,7 @@ static void writeEsRelation(PlanStreamOutput out, EsRelation relation) throws IO
if (supportingEsSourceOptions(out.getTransportVersion())) {
writeEsSourceOptions(out); // write (null) string fillers expected by remote
}
writeIndexMode(out, relation.indexMode());
out.writeBoolean(relation.frozen());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.Stat;
import org.elasticsearch.xpack.esql.plan.physical.EsTimeseriesQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
Expand Down Expand Up @@ -85,11 +84,9 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic
public static final EsqlTranslatorHandler TRANSLATOR_HANDLER = new EsqlTranslatorHandler();

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
private final boolean timeSeriesMode;

public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
super(context);
this.timeSeriesMode = context.configuration().pragmas().timeSeriesMode();
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
Expand All @@ -106,7 +103,7 @@ PhysicalPlan verify(PhysicalPlan plan) {

protected List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(4);
esSourceRules.add(new ReplaceAttributeSourceWithDocId(timeSeriesMode));
esSourceRules.add(new ReplaceAttributeSourceWithDocId());

if (optimizeForEsSource) {
esSourceRules.add(new PushTopNToSource());
Expand All @@ -131,20 +128,13 @@ protected List<Batch<PhysicalPlan>> batches() {

private static class ReplaceAttributeSourceWithDocId extends OptimizerRule<EsSourceExec> {

private final boolean timeSeriesMode;

ReplaceAttributeSourceWithDocId(boolean timeSeriesMode) {
ReplaceAttributeSourceWithDocId() {
super(UP);
this.timeSeriesMode = timeSeriesMode;
}

@Override
protected PhysicalPlan rule(EsSourceExec plan) {
if (timeSeriesMode) {
return new EsTimeseriesQueryExec(plan.source(), plan.index(), plan.query());
} else {
return new EsQueryExec(plan.source(), plan.index(), plan.query());
}
return new EsQueryExec(plan.source(), plan.index(), plan.indexMode(), plan.query());
}
}

Expand Down Expand Up @@ -230,6 +220,7 @@ protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext
queryExec = new EsQueryExec(
queryExec.source(),
queryExec.index(),
queryExec.indexMode(),
queryExec.output(),
query,
queryExec.limit(),
Expand Down Expand Up @@ -325,10 +316,7 @@ private static class PushTopNToSource extends PhysicalOptimizerRules.Parameteriz
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = topNExec;
PhysicalPlan child = topNExec.child();

boolean canPushDownTopN = child instanceof EsQueryExec
|| (child instanceof ExchangeExec exchangeExec && exchangeExec.child() instanceof EsQueryExec);
if (canPushDownTopN && canPushDownOrders(topNExec.order(), x -> hasIdenticalDelegate(x, ctx.searchStats()))) {
if (canPushSorts(child) && canPushDownOrders(topNExec.order(), x -> hasIdenticalDelegate(x, ctx.searchStats()))) {
var sorts = buildFieldSorts(topNExec.order());
var limit = topNExec.limit();

Expand All @@ -355,6 +343,16 @@ private List<EsQueryExec.FieldSort> buildFieldSorts(List<Order> orders) {
}
}

private static boolean canPushSorts(PhysicalPlan plan) {
if (plan instanceof EsQueryExec queryExec) {
return queryExec.canPushSorts();
}
if (plan instanceof ExchangeExec exchangeExec && exchangeExec.child() instanceof EsQueryExec queryExec) {
return queryExec.canPushSorts();
}
return false;
}

/**
* Looks for the case where certain stats exist right before the query and thus can be pushed down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.dissect.DissectException;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.core.common.Failure;
import org.elasticsearch.xpack.esql.core.expression.Alias;
Expand Down Expand Up @@ -234,7 +235,7 @@ public LogicalPlan visitFromCommand(EsqlBaseParser.FromCommandContext ctx) {
}
}
}
return new EsqlUnresolvedRelation(source, table, Arrays.asList(metadataMap.values().toArray(Attribute[]::new)));
return new EsqlUnresolvedRelation(source, table, Arrays.asList(metadataMap.values().toArray(Attribute[]::new)), IndexMode.STANDARD);
}

@Override
Expand Down Expand Up @@ -428,7 +429,7 @@ public LogicalPlan visitMetricsCommand(EsqlBaseParser.MetricsCommandContext ctx)
}
Source source = source(ctx);
TableIdentifier table = new TableIdentifier(source, null, visitIndexIdentifiers(ctx.indexIdentifier()));
var unresolvedRelation = new EsqlUnresolvedRelation(source, table, List.of());
var unresolvedRelation = new EsqlUnresolvedRelation(source, table, List.of(), IndexMode.TIME_SERIES);
if (ctx.aggregates == null && ctx.grouping == null) {
return unresolvedRelation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.index.EsIndex;
Expand All @@ -26,25 +27,27 @@ public class EsRelation extends LeafPlan {
private final EsIndex index;
private final List<Attribute> attrs;
private final boolean frozen;
private final IndexMode indexMode;

public EsRelation(Source source, EsIndex index, boolean frozen) {
this(source, index, flatten(source, index.mapping()), frozen);
public EsRelation(Source source, EsIndex index, IndexMode indexMode, boolean frozen) {
this(source, index, flatten(source, index.mapping()), indexMode, frozen);
}

public EsRelation(Source source, EsIndex index, List<Attribute> attributes) {
this(source, index, attributes, false);
public EsRelation(Source source, EsIndex index, List<Attribute> attributes, IndexMode indexMode) {
this(source, index, attributes, indexMode, false);
}

public EsRelation(Source source, EsIndex index, List<Attribute> attributes, boolean frozen) {
public EsRelation(Source source, EsIndex index, List<Attribute> attributes, IndexMode indexMode, boolean frozen) {
super(source);
this.index = index;
this.attrs = attributes;
this.indexMode = indexMode;
this.frozen = frozen;
}

@Override
protected NodeInfo<EsRelation> info() {
return NodeInfo.create(this, EsRelation::new, index, attrs, frozen);
return NodeInfo.create(this, EsRelation::new, index, attrs, indexMode, frozen);
}

private static List<Attribute> flatten(Source source, Map<String, EsField> mapping) {
Expand Down Expand Up @@ -78,6 +81,10 @@ public boolean frozen() {
return frozen;
}

public IndexMode indexMode() {
return indexMode;
}

@Override
public List<Attribute> output() {
return attrs;
Expand All @@ -90,7 +97,7 @@ public boolean expressionsResolved() {

@Override
public int hashCode() {
return Objects.hash(index, frozen);
return Objects.hash(index, indexMode, frozen);
}

@Override
Expand All @@ -104,7 +111,7 @@ public boolean equals(Object obj) {
}

EsRelation other = (EsRelation) obj;
return Objects.equals(index, other.index) && frozen == other.frozen;
return Objects.equals(index, other.index) && indexMode == other.indexMode() && frozen == other.frozen;
}

@Override
Expand Down

0 comments on commit 4b50858

Please sign in to comment.