BUG-8941: enqueue purges once ask-based transactions resolve 84/61284/5
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 7 Aug 2017 16:16:23 +0000 (18:16 +0200)
committerStephen Kitt <skitt@redhat.com>
Fri, 11 Aug 2017 11:46:50 +0000 (11:46 +0000)
Backend state tracking relies on the transaction log to propagate
transaction state from the leader to followers. This includes purging
of transactions, i.e. the information that the frontend will not need
the state (and the final resolution of the transaction).

Tell-based protocol handles this on the frontend, ask-based needs to
do this on the backend (as it has no notion of transaction continuation).

Change-Id: I49e787b38998ef67b4a9ef504a70822263e1a340
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 41cedd1..d1323d4 100644 (file)
@@ -376,12 +376,12 @@ public class ClusterAdminRpcServiceTest {
         // Write data to member-3's oper datastore and read/verify via member-2
         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
 
-        // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
-        // 2 ServerConfigurationPayload entries and the transaction payload entry.
+        // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
+        // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
 
         RaftStateVerifier verifier = raftState -> {
-            assertEquals("Commit index", 2, raftState.getCommitIndex());
-            assertEquals("Last applied index", 2, raftState.getLastApplied());
+            assertEquals("Commit index", 3, raftState.getCommitIndex());
+            assertEquals("Last applied index", 3, raftState.getLastApplied());
         };
 
         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
index bf5d271..080d3ee 100644 (file)
@@ -318,6 +318,7 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -326,8 +327,9 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionId(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -371,6 +373,8 @@ final class ShardCommitCoordinator {
         cohortEntry.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -379,6 +383,7 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);
index 32b9900..e84381f 100644 (file)
@@ -691,7 +691,7 @@ public class ShardTest extends AbstractShardTest {
 
                 verifyOuterListEntry(shard, 1);
 
-                verifyLastApplied(shard, 2);
+                verifyLastApplied(shard, 5);
             }
         };
     }
@@ -1195,7 +1195,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Commit index should advance as we do not have an empty
                 // modification
-                assertEquals(0, shardStats.getCommitIndex());
+                assertEquals(1, shardStats.getCommitIndex());
             }
         };
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.