BUG-8639: always invalidate primary info cache
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index 8d85bdcb666a24755a567db6fffa7d18c1fb9feb..8f6271605727abba421dafd8bb9dee3a070bdd0c 100644 (file)
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 /**
- * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
+ * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
  */
 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
 
@@ -41,7 +42,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
         @Override
-        public Object newMessage(String transactionId, short version) {
+        public Object newMessage(TransactionIdentifier transactionId, short version) {
             return new CommitTransaction(transactionId, version).toSerializable();
         }
 
@@ -53,7 +54,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
         @Override
-        public Object newMessage(String transactionId, short version) {
+        public Object newMessage(TransactionIdentifier transactionId, short version) {
             return new AbortTransaction(transactionId, version).toSerializable();
         }
 
@@ -66,39 +67,41 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     private final ActorContext actorContext;
     private final List<CohortInfo> cohorts;
     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
-    private final String transactionId;
+    private final TransactionIdentifier transactionId;
     private volatile OperationCallback commitOperationCallback;
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts, String transactionId) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
+            TransactionIdentifier transactionId) {
         this.actorContext = actorContext;
         this.cohorts = cohorts;
-        this.transactionId = transactionId;
+        this.transactionId = Preconditions.checkNotNull(transactionId);
 
-        if(cohorts.isEmpty()) {
+        if (cohorts.isEmpty()) {
             cohortsResolvedFuture.set(null);
         }
     }
 
     private ListenableFuture<Void> resolveCohorts() {
-        if(cohortsResolvedFuture.isDone()) {
+        if (cohortsResolvedFuture.isDone()) {
             return cohortsResolvedFuture;
         }
 
         final AtomicInteger completed = new AtomicInteger(cohorts.size());
-        for(final CohortInfo info: cohorts) {
+        final Object lock = new Object();
+        for (final CohortInfo info: cohorts) {
             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection actor)  {
-                    synchronized(completed) {
+                    synchronized (lock) {
                         boolean done = completed.decrementAndGet() == 0;
-                        if(failure != null) {
+                        if (failure != null) {
                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
                             cohortsResolvedFuture.setException(failure);
-                        } else if(!cohortsResolvedFuture.isDone()) {
+                        } else if (!cohortsResolvedFuture.isDone()) {
                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
 
                             info.setResolvedActor(actor);
-                            if(done) {
+                            if (done) {
                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
                                 cohortsResolvedFuture.set(null);
                             }
@@ -142,7 +145,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         LOG.debug("Tx {} finishCanCommit", transactionId);
 
         // For empty transactions return immediately
-        if(cohorts.size() == 0){
+        if (cohorts.size() == 0) {
             LOG.debug("Tx {}: canCommit returning result true", transactionId);
             returnFuture.set(Boolean.TRUE);
             return;
@@ -184,7 +187,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                     return;
                 }
 
-                if(iterator.hasNext() && result) {
+                if (iterator.hasNext() && result) {
                     sendCanCommitTransaction(iterator.next(), this);
                 } else {
                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
@@ -200,9 +203,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
-        }
+        LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
 
         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
@@ -211,12 +212,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
-        for(CohortInfo cohort : cohorts) {
+        for (CohortInfo cohort : cohorts) {
             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
-            }
+            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
 
             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
                     actorContext.getTransactionCommitOperationTimeout()));
@@ -253,15 +252,16 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                 CommitTransactionReply.class, true, operationCallback);
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private static boolean successfulFuture(ListenableFuture<Void> future) {
-        if(!future.isDone()) {
+        if (!future.isDone()) {
             return false;
         }
 
         try {
             future.get();
             return true;
-        } catch(Exception e) {
+        } catch (Exception e) {
             return false;
         }
     }
@@ -277,7 +277,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         // if not for some reason, we'll try to build it here.
 
         ListenableFuture<Void> future = resolveCohorts();
-        if(successfulFuture(future)) {
+        if (successfulFuture(future)) {
             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
                     returnFuture, callback);
         } else {
@@ -292,7 +292,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                 public void onFailure(Throwable failure) {
                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
 
-                    if(propagateException) {
+                    if (propagateException) {
                         returnFuture.setException(failure);
                     } else {
                         returnFuture.set(null);
@@ -317,9 +317,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
                 Throwable exceptionToPropagate = failure;
-                if(exceptionToPropagate == null) {
-                    for(Object response: responses) {
-                        if(!response.getClass().equals(expectedResponseClass)) {
+                if (exceptionToPropagate == null) {
+                    for (Object response: responses) {
+                        if (!response.getClass().equals(expectedResponseClass)) {
                             exceptionToPropagate = new IllegalArgumentException(
                                     String.format("Unexpected response type %s", response.getClass()));
                             break;
@@ -327,9 +327,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                     }
                 }
 
-                if(exceptionToPropagate != null) {
+                if (exceptionToPropagate != null) {
                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
-                    if(propagateException) {
+                    if (propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
                         returnFuture.setException(exceptionToPropagate);
@@ -355,7 +355,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     @Override
     List<Future<ActorSelection>> getCohortFutures() {
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
-        for(CohortInfo info: cohorts) {
+        for (CohortInfo info: cohorts) {
             cohortFutures.add(info.getActorFuture());
         }
 
@@ -392,7 +392,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     }
 
     private interface MessageSupplier {
-        Object newMessage(String transactionId, short version);
+        Object newMessage(TransactionIdentifier transactionId, short version);
+
         boolean isSerializedReplyType(Object reply);
     }
 }