Skip to content

Commit

Permalink
[Transform] Forward indexServiceSafe exceptions to listener (#108517)
Browse files Browse the repository at this point in the history
IndexService.indexServiceSafe can throw an IndexNotFoundException while getting the Global Checkpoints.
In theory, any exception in TransportGetCheckpointNodeAction should be forwarded to the listener.

Fix #108418
  • Loading branch information
prwhelan committed May 13, 2024
1 parent 1445fd2 commit 6ecff09
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/108517.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 108517
summary: Forward `indexServiceSafe` exception to listener
area: Transform
type: bug
issues:
- 108418
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.xpack.transform.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -34,6 +36,7 @@

public class TransportGetCheckpointNodeAction extends HandledTransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportGetCheckpointNodeAction.class);
private final IndicesService indicesService;

@Inject
Expand Down Expand Up @@ -83,17 +86,27 @@ protected static void getGlobalCheckpoints(
return;
}
}
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());

checkpointsByIndexOfThisNode.computeIfAbsent(shardId.getIndexName(), k -> {
long[] seqNumbers = new long[indexService.getIndexSettings().getNumberOfShards()];
Arrays.fill(seqNumbers, SequenceNumbers.UNASSIGNED_SEQ_NO);
return seqNumbers;
});
checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint();
++numProcessedShards;
try {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());

checkpointsByIndexOfThisNode.computeIfAbsent(shardId.getIndexName(), k -> {
long[] seqNumbers = new long[indexService.getIndexSettings().getNumberOfShards()];
Arrays.fill(seqNumbers, SequenceNumbers.UNASSIGNED_SEQ_NO);
return seqNumbers;
});
checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint();
++numProcessedShards;
} catch (Exception e) {
logger.atDebug()
.withThrowable(e)
.log("Failed to get checkpoint for shard [{}] and index [{}]", shardId.getId(), shardId.getIndexName());
listener.onFailure(e);
return;
}
}

listener.onResponse(new Response(checkpointsByIndexOfThisNode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -47,6 +48,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -68,35 +71,9 @@ public void setUp() throws Exception {
null,
(TaskManager) null
);
IndexShard indexShardA0 = mock(IndexShard.class);
when(indexShardA0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_000));
IndexShard indexShardA1 = mock(IndexShard.class);
when(indexShardA1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_001));
IndexShard indexShardB0 = mock(IndexShard.class);
when(indexShardB0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_000));
IndexShard indexShardB1 = mock(IndexShard.class);
when(indexShardB1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_001));
Settings commonIndexSettings = Settings.builder()
.put(SETTING_VERSION_CREATED, 1_000_000)
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
IndexService indexServiceA = mock(IndexService.class);
when(indexServiceA.getIndexSettings()).thenReturn(
new IndexSettings(IndexMetadata.builder("my-index-A").settings(commonIndexSettings).build(), Settings.EMPTY)
);
when(indexServiceA.getShard(0)).thenReturn(indexShardA0);
when(indexServiceA.getShard(1)).thenReturn(indexShardA1);
IndexService indexServiceB = mock(IndexService.class);
when(indexServiceB.getIndexSettings()).thenReturn(
new IndexSettings(IndexMetadata.builder("my-index-B").settings(commonIndexSettings).build(), Settings.EMPTY)
);
when(indexServiceB.getShard(0)).thenReturn(indexShardB0);
when(indexServiceB.getShard(1)).thenReturn(indexShardB1);

indicesService = mock(IndicesService.class);
when(indicesService.clusterService()).thenReturn(clusterService);
when(indicesService.indexServiceSafe(new Index("my-index-A", "A"))).thenReturn(indexServiceA);
when(indicesService.indexServiceSafe(new Index("my-index-B", "B"))).thenReturn(indexServiceB);

task = new CancellableTask(123, "type", "action", "description", new TaskId("dummy-node:456"), Map.of());
clock = new FakeClock(Instant.now());
Expand All @@ -117,6 +94,7 @@ public void testGetGlobalCheckpointsWithHighTimeout() throws InterruptedExceptio
}

private void testGetGlobalCheckpointsSuccess(TimeValue timeout) throws InterruptedException {
mockIndexServiceResponse();
CountDownLatch latch = new CountDownLatch(1);
SetOnce<GetCheckpointNodeAction.Response> responseHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
Expand All @@ -136,7 +114,38 @@ private void testGetGlobalCheckpointsSuccess(TimeValue timeout) throws Interrupt
assertThat(exceptionHolder.get(), is(nullValue()));
}

private void mockIndexServiceResponse() {
IndexShard indexShardA0 = mock(IndexShard.class);
when(indexShardA0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_000));
IndexShard indexShardA1 = mock(IndexShard.class);
when(indexShardA1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_001));
IndexShard indexShardB0 = mock(IndexShard.class);
when(indexShardB0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_000));
IndexShard indexShardB1 = mock(IndexShard.class);
when(indexShardB1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_001));
Settings commonIndexSettings = Settings.builder()
.put(SETTING_VERSION_CREATED, 1_000_000)
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
IndexService indexServiceA = mock(IndexService.class);
when(indexServiceA.getIndexSettings()).thenReturn(
new IndexSettings(IndexMetadata.builder("my-index-A").settings(commonIndexSettings).build(), Settings.EMPTY)
);
when(indexServiceA.getShard(0)).thenReturn(indexShardA0);
when(indexServiceA.getShard(1)).thenReturn(indexShardA1);
IndexService indexServiceB = mock(IndexService.class);
when(indexServiceB.getIndexSettings()).thenReturn(
new IndexSettings(IndexMetadata.builder("my-index-B").settings(commonIndexSettings).build(), Settings.EMPTY)
);
when(indexServiceB.getShard(0)).thenReturn(indexShardB0);
when(indexServiceB.getShard(1)).thenReturn(indexShardB1);
when(indicesService.indexServiceSafe(new Index("my-index-A", "A"))).thenReturn(indexServiceA);
when(indicesService.indexServiceSafe(new Index("my-index-B", "B"))).thenReturn(indexServiceB);
}

public void testGetGlobalCheckpointsFailureDueToTaskCancelled() throws InterruptedException {
mockIndexServiceResponse();
TaskCancelHelper.cancel(task, "due to apocalypse");

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -156,6 +165,7 @@ public void testGetGlobalCheckpointsFailureDueToTaskCancelled() throws Interrupt
}

public void testGetGlobalCheckpointsFailureDueToTimeout() throws InterruptedException {
mockIndexServiceResponse();
// Move the current time past the timeout.
clock.advanceTimeBy(Duration.ofSeconds(10));

Expand Down Expand Up @@ -184,4 +194,24 @@ public void testGetGlobalCheckpointsFailureDueToTimeout() throws InterruptedExce
is(equalTo("Transform checkpointing timed out on node [dummy-node] after [5s] having processed [0] of [4] shards"))
);
}

public void testIndexNotFoundException() throws InterruptedException {
var expectedException = new IndexNotFoundException("some index");
when(indicesService.indexServiceSafe(any())).thenThrow(expectedException);

var exceptionHolder = new SetOnce<Exception>();
TransportGetCheckpointNodeAction.getGlobalCheckpoints(
indicesService,
task,
shards,
TimeValue.timeValueSeconds(5),
clock,
ActionListener.wrap(r -> {
fail("Test is meant to call the onFailure method.");
}, exceptionHolder::set)
);

assertNotNull("Listener's onFailure handler was not called.", exceptionHolder.get());
assertThat(exceptionHolder.get(), sameInstance(expectedException));
}
}

0 comments on commit 6ecff09

Please sign in to comment.