Trace the originating generation in RGE
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 33634b1d6c3e322642e9b458e05d59feb74a577e..0dd50d479068d0b92f1e601352b9b91577a13c01 100644 (file)
@@ -103,7 +103,7 @@ final class ShardCommitCoordinator {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
@@ -162,7 +162,7 @@ final class ShardCommitCoordinator {
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -186,13 +186,14 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
-        final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(),
-            message.getModification());
+        final TransactionIdentifier txId = message.getTransactionId();
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
-        log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId());
+        log.debug("{}: Applying local modifications for Tx {}", name, txId);
 
         if (message.isDoCommitOnReady()) {
             cohortEntry.setReplySender(sender);
@@ -227,7 +228,9 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
@@ -285,7 +288,7 @@ final class ShardCommitCoordinator {
         handleCanCommit(cohortEntry);
     }
 
-    private void doCommit(final CohortEntry cohortEntry) {
+    void doCommit(final CohortEntry cohortEntry) {
         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
 
         // We perform the preCommit phase here atomically with the commit phase. This is an
@@ -309,7 +312,7 @@ final class ShardCommitCoordinator {
         });
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+    void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
         log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
 
         cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@@ -318,6 +321,7 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -326,8 +330,9 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionId(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -371,6 +376,8 @@ final class ShardCommitCoordinator {
         cohortEntry.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -379,6 +386,7 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);
@@ -449,7 +457,7 @@ final class ShardCommitCoordinator {
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);