Disable state tracking on ReadyLocalTransaction 03/98403/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 11 Nov 2021 08:22:08 +0000 (09:22 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 11 Nov 2021 08:23:25 +0000 (09:23 +0100)
We are failing to recognize ReadyLocalTransaction as an ask-based
message, leading to us populating state.

JIRA: CONTROLLER-2021
Change-Id: I48b3c57e11430a5ca15e0559f2a8e13627f4bab1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java

index d5ecbf6..c8341e2 100644 (file)
@@ -133,7 +133,7 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         final FrontendIdentifier frontendId = clientId.getFrontendId();
         final FrontendClientMetadataBuilder client = clients.get(frontendId);
         if (client == null) {
-            // When we havent seen the client before, we still need to disable tracking for him since this only gets
+            // When we have not seen the client before, we still need to disable tracking for him since this only gets
             // triggered once.
             LOG.debug("{}: disableTracking {} does not match any client, pre-disabling client.", shardName, clientId);
             clients.put(frontendId, new FrontendClientMetadataBuilder.Disabled(shardName, clientId));
index 357df4a..a4d5242 100644 (file)
@@ -221,12 +221,12 @@ public class Shard extends RaftActor {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
-        this.name = builder.getId().toString();
-        this.shardName = builder.getId().getShardName();
-        this.datastoreContext = builder.getDatastoreContext();
-        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
-        this.frontendMetadata = new FrontendMetadata(name);
-        this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+        name = builder.getId().toString();
+        shardName = builder.getId().getShardName();
+        datastoreContext = builder.getDatastoreContext();
+        restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        frontendMetadata = new FrontendMetadata(name);
+        exportOnRecovery = datastoreContext.getExportOnRecovery();
 
         switch (exportOnRecovery) {
             case Json:
@@ -261,7 +261,7 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+        commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
 
         setTransactionCommitTimeout();
 
@@ -277,16 +277,16 @@ public class Shard extends RaftActor {
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
-            this.name, datastoreContext);
+            name, datastoreContext);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
-        responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+        responseMessageSlicer = MessageSlicer.builder().logContext(name)
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
 
-        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+        requestMessageAssembler = MessageAssembler.builder().logContext(name)
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .assembledMessageCallback((message, sender) -> self().tell(message, sender))
                 .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@@ -818,22 +818,23 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
-        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());
+        final TransactionIdentifier txId = message.getTransactionId();
+        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(txId);
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
-                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
-                        message.getTransactionId(), e);
+                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
                 getSender().tell(new Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not process ready local transaction " + message.getTransactionId());
+                        "Could not process ready local transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
@@ -1119,7 +1120,7 @@ public class Shard extends RaftActor {
 
     @Override
     public final String persistenceId() {
-        return this.name;
+        return name;
     }
 
     @Override
index 11f0e3d..6563ff6 100644 (file)
@@ -15,7 +15,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
@@ -305,8 +304,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     }
 
     private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
-        // FIXME: needs to be enabled
-        assumeFalse(true);
         // ask based should track no metadata
         assertEquals(List.of(), clientMeta.getCurrentHistories());
     }