BUG 2792 : Serialize phase processing in ConcurrentDOMDatBroker
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ConcurrentDOMDataBroker.java
index 886c4730678208fdbf129be10463eddf252f5c0f..538f2981daae891406934ffcf9605a9aecdd72c2 100644 (file)
@@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
 
         final long startTime = System.nanoTime();
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
@@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
                             new TransactionCommitFailedException(
                                             "Can Commit failed, no detailed cause available."));
                 } else {
-                    if(remaining.decrementAndGet() == 0) {
+                    if(!cohortIterator.hasNext()) {
                         // All cohorts completed successfully - we can move on to the preCommit phase
                         doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                    } else {
+                        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+                        Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
                     }
                 }
             }
@@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
-            Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+        Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we can move on to the commit phase
                     doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                } else {
+                    ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+                    Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-            Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+        Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we're done.
                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
 
                     clientSubmitFuture.set();
+                } else {
+                    ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+                    Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> commitFuture = cohort.commit();
-            Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+        Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,