Disable state tracking on ReadyLocalTransaction 36/98436/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 11 Nov 2021 08:22:08 +0000 (09:22 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 11 Nov 2021 12:54:17 +0000 (13:54 +0100)
We are failing to recognize ReadyLocalTransaction as an ask-based
message, leading to us populating state.

JIRA: CONTROLLER-2021
Change-Id: I48b3c57e11430a5ca15e0559f2a8e13627f4bab1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 35b5462f1ebde45b569664af28199acb437a28d9)
(cherry picked from commit cec65b8fafb50088bc3eb6db615ac908db5122b0)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java

index d5ecbf6ea416c6c283f0e06b5537293bbdef99e8..c8341e29bcf78581f90ab5097827f59bcc7e1234 100644 (file)
@@ -133,7 +133,7 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         final FrontendIdentifier frontendId = clientId.getFrontendId();
         final FrontendClientMetadataBuilder client = clients.get(frontendId);
         if (client == null) {
-            // When we havent seen the client before, we still need to disable tracking for him since this only gets
+            // When we have not seen the client before, we still need to disable tracking for him since this only gets
             // triggered once.
             LOG.debug("{}: disableTracking {} does not match any client, pre-disabling client.", shardName, clientId);
             clients.put(frontendId, new FrontendClientMetadataBuilder.Disabled(shardName, clientId));
index 886b26f96589625cfca5458bab81952731597ad4..ee4d16c9b23d8b74d4f28c46275c323c14677199 100644 (file)
@@ -214,11 +214,11 @@ public class Shard extends RaftActor {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
-        this.name = builder.getId().toString();
-        this.shardName = builder.getId().getShardName();
-        this.datastoreContext = builder.getDatastoreContext();
-        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
-        this.frontendMetadata = new FrontendMetadata(name);
+        name = builder.getId().toString();
+        shardName = builder.getId().getShardName();
+        datastoreContext = builder.getDatastoreContext();
+        restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        frontendMetadata = new FrontendMetadata(name);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -242,7 +242,7 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+        commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
 
         setTransactionCommitTimeout();
 
@@ -258,16 +258,16 @@ public class Shard extends RaftActor {
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
-            this.name, datastoreContext);
+            name, datastoreContext);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
-        responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+        responseMessageSlicer = MessageSlicer.builder().logContext(name)
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
 
-        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+        requestMessageAssembler = MessageAssembler.builder().logContext(name)
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .assembledMessageCallback((message, sender) -> self().tell(message, sender))
                 .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@@ -778,22 +778,23 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
-        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());
+        final TransactionIdentifier txId = message.getTransactionId();
+        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(txId);
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
-                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
-                        message.getTransactionId(), e);
+                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
                 getSender().tell(new Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not process ready local transaction " + message.getTransactionId());
+                        "Could not process ready local transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
@@ -1085,8 +1086,8 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public String persistenceId() {
-        return this.name;
+    public final String persistenceId() {
+        return name;
     }
 
     @Override
index 7bfc7baf27558a0db51197b73d4ed447cf309d29..ccd3ea1556e3986a106807c20537067d0d20a05b 100644 (file)
@@ -15,7 +15,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
@@ -305,8 +304,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     }
 
     private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
-        // FIXME: needs to be enabled
-        assumeFalse(true);
         // ask based should track no metadata
         assertEquals(List.of(), clientMeta.getCurrentHistories());
     }