From: Robert Varga Date: Mon, 7 Aug 2017 16:16:23 +0000 (+0200) Subject: BUG-8941: enqueue purges once ask-based transactions resolve X-Git-Tag: release/nitrogen~11 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=df3361f6703078faff9c4b029d0021a63ebe6e6e BUG-8941: enqueue purges once ask-based transactions resolve Backend state tracking relies on the transaction log to propagate transaction state from the leader to followers. This includes purging of transactions, i.e. the information that the frontend will not need the state (and the final resolution of the transaction). Tell-based protocol handles this on the frontend, ask-based needs to do this on the backend (as it has no notion of transaction continuation). Change-Id: I49e787b38998ef67b4a9ef504a70822263e1a340 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 41cedd1fd6..d1323d48cd 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -376,12 +376,12 @@ public class ClusterAdminRpcServiceTest { // Write data to member-3's oper datastore and read/verify via member-2 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore()); - // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - - // 2 ServerConfigurationPayload entries and the transaction payload entry. + // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 - + // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload. RaftStateVerifier verifier = raftState -> { - assertEquals("Commit index", 2, raftState.getCommitIndex()); - assertEquals("Last applied index", 2, raftState.getLastApplied()); + assertEquals("Commit index", 3, raftState.getCommitIndex()); + assertEquals("Last applied index", 3, raftState.getLastApplied()); }; verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index bf5d271bfc..080d3eec23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -318,6 +318,7 @@ final class ShardCommitCoordinator { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); + cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), @@ -326,8 +327,9 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionId(), failure); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); + log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); + cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -371,6 +373,8 @@ final class ShardCommitCoordinator { cohortEntry.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { + shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); + if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } @@ -379,6 +383,7 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { log.error("{}: An exception happened during abort", name, failure); + shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(new Failure(failure), self); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 32b9900b0b..e84381f989 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -691,7 +691,7 @@ public class ShardTest extends AbstractShardTest { verifyOuterListEntry(shard, 1); - verifyLastApplied(shard, 2); + verifyLastApplied(shard, 5); } }; } @@ -1195,7 +1195,7 @@ public class ShardTest extends AbstractShardTest { // Commit index should advance as we do not have an empty // modification - assertEquals(0, shardStats.getCommitIndex()); + assertEquals(1, shardStats.getCommitIndex()); } }; }