Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] JedisCheckpointStore - Invalid partitionOwnership data from CheckpointStore #40176

Closed
3 tasks done
MattWaterton-OAG opened this issue May 15, 2024 · 2 comments · Fixed by #40379
Closed
3 tasks done
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@MattWaterton-OAG
Copy link

Describe the bug
Currently I am working on a proof of concept for the Beta version of the Azure JedisCheckpointStore. I am following instructions as described in documents:

After following all the detailed instructions in above links, on first run, everything is correct - partition ownership along with corresponding checkpoints are recorded in the Azure Redis Cache. After the prototype is stopped, and then restarted, an error is experienced. Also the same error is seen when two instances of the client code (EventProcessorClient) are started to demonstrate load balancing. If checkpointing is disabled (i.e., no calls are made to eventContext.updateCheckpoint()), the error is not seen and load balancing works correctly.

The root cause issue appears to be with the JedisCheckpointStore claimOwnership() code, but I am happy to be proved wrong. The error surfaces in JedisCheckpointStore listOwnership() code on start up as the partitionOwnership object (retrieved from the Redis cache) is missing two pieces of information it expects (LastModifiedTime, ETag). The com.azure.messaging.eventhubs.PartitionBasedLoadBalancer code checks the partitionOwnership objects by running the isValid() method against them. This check throws the error when it discovers the attributes are missing.

Would be interested if others see this as a bug. Thanking you in advance.

Exception or Stack Trace
Error occurred in partition processor for partition NONE, java.lang.IllegalStateException: Invalid partitionOwnership data from CheckpointStore.
java.lang.IllegalStateException: Invalid partitionOwnership data from CheckpointStore
at com.azure.messaging.eventhubs.PartitionBasedLoadBalancer.lambda$loadBalance$6(PartitionBasedLoadBalancer.java:186)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:139)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:258)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:347)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onComplete(FluxTimeout.java:234)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:424)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:291)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8642)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

To Reproduce
Steps to reproduce the behavior:

  1. Set up an Azure Cache for Redis.
  2. Set up an Event Hub with 32 partitions and populate with data ensuring data is spread across all partitions.
  3. Set up a proof of concept project in IntelliJ following instructions in:
  4. Ensure cache is clear of all keys by running the flushall command in the Redis Console in Azure Portal.
  5. Run once for several minutes against the Event Hub that contains data on all 32 partitions.
  6. Stop the running code in the IDE.
  7. Start the client code a second time to witness the error.

Code Snippet
Suspected error in code below. I have commented with 'Matt W' where I believe a change could be made.

/**
 * This method returns the list of partitions that were owned successfully.
 *
 * @param requestedPartitionOwnerships List of partition ownerships from the current instance
 *
 * @return Flux of PartitionOwnership objects
 */
@Override
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
    return Flux.fromIterable(requestedPartitionOwnerships).handle((partitionOwnership, sink) -> {
        String partitionId = partitionOwnership.getPartitionId();
        String fullyQualifiedNamespace = partitionOwnership.getFullyQualifiedNamespace();
        String eventHubName = partitionOwnership.getEventHubName();
        String consumerGroup = partitionOwnership.getConsumerGroup();

        byte[] key = keyBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
        //Matt W fix - remove serializedOwnership from here and move to inside the try block
        byte[] serializedOwnership = DEFAULT_SERIALIZER.serializeToBytes(partitionOwnership);

        try (Jedis jedis = jedisPool.getResource()) {
            // Start watching for any updates.
            jedis.watch(key);

            List<byte[]> keyInformation = jedis.hmget(key, PARTITION_OWNERSHIP);

            long lastModifiedTimeSeconds = Long.parseLong(jedis.time().get(0));
            partitionOwnership.setLastModifiedTime(lastModifiedTimeSeconds);
            partitionOwnership.setETag("");

            //Matt W fix - move serializedOwnership to here to include the LastModifiedTimeSeconds and ETag modified attributes
            //byte[] serializedOwnership = DEFAULT_SERIALIZER.serializeToBytes(partitionOwnership);

            // if PARTITION_OWNERSHIP field does not exist for member we will get a null. Try to add a new entry
            // and then return.
            if (keyInformation == null || keyInformation.isEmpty() || keyInformation.get(0) == null) {
                try {
                    long result = jedis.hsetnx(key, PARTITION_OWNERSHIP, serializedOwnership);

                    if (result == 1) {
                        sink.next(partitionOwnership);
                    } else {
                        // Sometime between fetching the ownership information and trying to set it, someone else
                        // updated/added ownership.
                        addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName,
                                consumerGroup)
                                .addKeyValue(PARTITION_ID_KEY, partitionId)
                                .log("Unable to create new partition ownership entry.");

                        sink.error(new AzureException("Unable to claim partition: " + partitionId
                                + " Partition ownership created already."));
                    }
                } finally {
                    jedis.unwatch();
                }

                return;
            }

            // An entry for PARTITION_OWNERSHIP exists. We'll try to modify it.
            Transaction transaction = jedis.multi();
            transaction.hset(key, PARTITION_OWNERSHIP, serializedOwnership);

            // If at least one watched key is modified before the EXEC command, the whole transaction aborts, and
            // EXEC returns a Null reply to notify that the transaction failed.
            List<Object> executionResponse = transaction.exec();

            if (executionResponse == null) {
                // This means that the transaction did not execute, which implies that another client has
                // changed the ownership during this transaction
                sink.error(createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup,
                        partitionId, "Transaction was aborted."));
            } else if (executionResponse.isEmpty()) {
                sink.error(createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup,
                        partitionId, "No command results in transaction result."));
            } else if (executionResponse.get(0) == null) {
                sink.error(createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup,
                        partitionId, "Executing update command resulted in null."));
            } else {
                addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup)
                        .addKeyValue(PARTITION_ID_KEY, partitionId)
                        .log("Claimed partition.");

                sink.next(partitionOwnership);
            }
        }
    });
}

Expected behavior
When the client code (EventProcessorClient) is started for a second time, after being stopped, the above error ("Invalid partitionOwnership data from CheckpointStore") should not be seen. The code snippet provided above shows (in comments) where I believe the code should be changed to incorporate the missing attributes lastModifiedTime and eTag when being serialized.

PartitionOwnership object missing expected attributes (lastModifiedTime, eTag):

key: nsclienteu81fxxxxxx.servicebus.windows.net/eh178d50xxxx/consumergroup-amp/5
value:

{checkpoint={"fullyQualifiedNamespace":"nsclienteu81fxxxxxx.servicebus.windows.net","eventHubName":"eh178d50xxxx","consumerGroup":"consumergroup-amp","partitionId":"5","offset":9397390155864,"sequenceNumber":32695570}, partitionOwnership={"fullyQualifiedNamespace":"nsclienteu81fxxxxxx.servicebus.windows.net","eventHubName":"eh178d50xxxx","consumerGroup":"consumergroup-amp","partitionId":"5","ownerId":"8b74a637-cb0f-4856-be1e-02a16460e0f9"}}

After fix, PartitionOwnership object containing expected attributes (lastModifiedTime, eTag):

key: nsclienteu81fxxxxxx.servicebus.windows.net/eh178d50xxxx/consumergroup-amp/5
value:

{checkpoint={"fullyQualifiedNamespace":"nsclienteu81fxxxxxx.servicebus.windows.net","eventHubName":"eh178d50xxxx","consumerGroup":"consumergroup-amp","partitionId":"5","offset":9397394785920,"sequenceNumber":32697650}, partitionOwnership={"fullyQualifiedNamespace":"nsclienteu81fxxxxxx.servicebus.windows.net","eventHubName":"eh178d50xxxx","consumerGroup":"consumergroup-amp","partitionId":"5","ownerId":"564f01c4-de73-43da-8d70-87f931275c25","lastModifiedTime":1715614166,"eTag":""}}

Screenshots
N/A

Setup:

  • OS: Windows 11
  • IDE: IntelliJ
  • Library/Libraries (pom.xml):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>AlertsJedisPoC</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>AlertsJedisPoC</name>
  <url>http://maven.apache.org</url>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-sdk-bom</artifactId>
        <version>1.2.23</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.release>21</maven.compiler.release>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
      <version>1.0.0-beta.2</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>5.1.2</version>
      <type>jar</type>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.23.0</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>
  • Java version: 21
  • App Server/Environment: local IDE
  • Frameworks: POJOs

Additional context
N/A

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels May 15, 2024
Copy link

@anuchandy @conniey @lmolkova

Copy link

Thank you for your feedback. Tagging and routing to the team member best able to assist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
2 participants