BUG-8941: enqueue purges once ask-based transactions resolve 84/61284/5
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 7 Aug 2017 16:16:23 +0000 (18:16 +0200)
committerStephen Kitt <skitt@redhat.com>
Fri, 11 Aug 2017 11:46:50 +0000 (11:46 +0000)
Backend state tracking relies on the transaction log to propagate
transaction state from the leader to followers. This includes purging
of transactions, i.e. the information that the frontend will not need
the state (and the final resolution of the transaction).

Tell-based protocol handles this on the frontend, ask-based needs to
do this on the backend (as it has no notion of transaction continuation).

Change-Id: I49e787b38998ef67b4a9ef504a70822263e1a340
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 41cedd1fd6a7930245004707f3a0d9758b48d473..d1323d48cdda375e95da7ab094bce84fd2bb04b9 100644 (file)
@@ -376,12 +376,12 @@ public class ClusterAdminRpcServiceTest {
         // Write data to member-3's oper datastore and read/verify via member-2
         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
 
-        // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
-        // 2 ServerConfigurationPayload entries and the transaction payload entry.
+        // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
+        // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
 
         RaftStateVerifier verifier = raftState -> {
-            assertEquals("Commit index", 2, raftState.getCommitIndex());
-            assertEquals("Last applied index", 2, raftState.getLastApplied());
+            assertEquals("Commit index", 3, raftState.getCommitIndex());
+            assertEquals("Last applied index", 3, raftState.getLastApplied());
         };
 
         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
index bf5d271bfc418a87288f2760b1d6d73233ae9e30..080d3eec23a22ef3f7cb9771d78f42e905d08dc1 100644 (file)
@@ -318,6 +318,7 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -326,8 +327,9 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionId(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -371,6 +373,8 @@ final class ShardCommitCoordinator {
         cohortEntry.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -379,6 +383,7 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);
index 32b9900b0bdfc39d6dbed55c6fa23cbd883feab3..e84381f989e5946cea8de3ab18e545c937d989d8 100644 (file)
@@ -691,7 +691,7 @@ public class ShardTest extends AbstractShardTest {
 
                 verifyOuterListEntry(shard, 1);
 
-                verifyLastApplied(shard, 2);
+                verifyLastApplied(shard, 5);
             }
         };
     }
@@ -1195,7 +1195,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Commit index should advance as we do not have an empty
                 // modification
-                assertEquals(0, shardStats.getCommitIndex());
+                assertEquals(1, shardStats.getCommitIndex());
             }
         };
     }