Move commit payload propagation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index e1c12cd489c3cb6f14164380cbff005512c50431..27a3359910480986909d2d4b85de1b0f5cfa25ef 100644 (file)
@@ -365,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+    private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
             throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -378,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
 
+        allMetadataCommittedTransaction(identifier);
         notifyListeners(candidate);
     }
 
@@ -404,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
-            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                txId = e.getKey();
-                applyReplicatedCandidate(txId, e.getValue());
+                applyReplicatedCandidate(e.getKey(), e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                txId = (TransactionIdentifier) identifier;
-                payloadReplicationComplete(txId);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
             }
-            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -467,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
@@ -738,8 +737,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 // For debugging purposes, allow dumping of the modification. Coupled with the above
                 // precondition log, it should allow us to understand what went on.
-                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
-                        dataTree);
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(),
+                    modification, dataTree);
                 cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
             } catch (Exception e) {
                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -873,7 +872,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
-    private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+    private void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
         if (atIndex == 0) {
             queue.addFirst(entry);
             return;
@@ -892,7 +891,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private Collection<String> extractPrecedingShardNames(
-            java.util.Optional<SortedSet<String>> participatingShardNames) {
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
             set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
     }
@@ -972,6 +971,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        allMetadataCommittedTransaction(txId);
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());