X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=17017b9b668ea699d1dab0ecfa7deac2448c84b4;hp=357ab92c819946ccc9e008db0a6b9fd1ad5bcf2f;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hpb=cab1d5845cb951fe31a3243653ed567583dc73c1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 357ab92c81..17017b9b66 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Future; /** - * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies + * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies. */ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort { @@ -76,31 +76,31 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< this.cohorts = cohorts; this.transactionId = Preconditions.checkNotNull(transactionId); - if(cohorts.isEmpty()) { + if (cohorts.isEmpty()) { cohortsResolvedFuture.set(null); } } private ListenableFuture resolveCohorts() { - if(cohortsResolvedFuture.isDone()) { + if (cohortsResolvedFuture.isDone()) { return cohortsResolvedFuture; } final AtomicInteger completed = new AtomicInteger(cohorts.size()); - for(final CohortInfo info: cohorts) { + for (final CohortInfo info: cohorts) { info.getActorFuture().onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, ActorSelection actor) { - synchronized(completed) { + synchronized (completed) { boolean done = completed.decrementAndGet() == 0; - if(failure != null) { + if (failure != null) { LOG.debug("Tx {}: a cohort Future failed", transactionId, failure); cohortsResolvedFuture.setException(failure); - } else if(!cohortsResolvedFuture.isDone()) { + } else if (!cohortsResolvedFuture.isDone()) { LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor); info.setResolvedActor(actor); - if(done) { + if (done) { LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId); cohortsResolvedFuture.set(null); } @@ -144,7 +144,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {} finishCanCommit", transactionId); // For empty transactions return immediately - if(cohorts.size() == 0){ + if (cohorts.size() == 0) { LOG.debug("Tx {}: canCommit returning result true", transactionId); returnFuture.set(Boolean.TRUE); return; @@ -186,7 +186,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } - if(iterator.hasNext() && result) { + if (iterator.hasNext() && result) { sendCanCommitTransaction(iterator.next(), this); } else { LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); @@ -202,9 +202,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete onComplete) { CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); - } + LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); Future future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(), message.toSerializable(), actorContext.getTransactionCommitOperationTimeout()); @@ -213,12 +211,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private Future> invokeCohorts(MessageSupplier messageSupplier) { List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); - for(CohortInfo cohort : cohorts) { + for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort); - } + LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort); futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, actorContext.getTransactionCommitOperationTimeout())); @@ -255,15 +251,16 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< CommitTransactionReply.class, true, operationCallback); } + @SuppressWarnings("checkstyle:IllegalCatch") private static boolean successfulFuture(ListenableFuture future) { - if(!future.isDone()) { + if (!future.isDone()) { return false; } try { future.get(); return true; - } catch(Exception e) { + } catch (Exception e) { return false; } } @@ -279,7 +276,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // if not for some reason, we'll try to build it here. ListenableFuture future = resolveCohorts(); - if(successfulFuture(future)) { + if (successfulFuture(future)) { finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, callback); } else { @@ -294,7 +291,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< public void onFailure(Throwable failure) { LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); - if(propagateException) { + if (propagateException) { returnFuture.setException(failure); } else { returnFuture.set(null); @@ -319,9 +316,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< @Override public void onComplete(Throwable failure, Iterable responses) throws Throwable { Throwable exceptionToPropagate = failure; - if(exceptionToPropagate == null) { - for(Object response: responses) { - if(!response.getClass().equals(expectedResponseClass)) { + if (exceptionToPropagate == null) { + for (Object response: responses) { + if (!response.getClass().equals(expectedResponseClass)) { exceptionToPropagate = new IllegalArgumentException( String.format("Unexpected response type %s", response.getClass())); break; @@ -329,9 +326,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } } - if(exceptionToPropagate != null) { + if (exceptionToPropagate != null) { LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate); - if(propagateException) { + if (propagateException) { // We don't log the exception here to avoid redundant logging since we're // propagating to the caller in MD-SAL core who will log it. returnFuture.setException(exceptionToPropagate); @@ -357,7 +354,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< @Override List> getCohortFutures() { List> cohortFutures = new ArrayList<>(cohorts.size()); - for(CohortInfo info: cohorts) { + for (CohortInfo info: cohorts) { cohortFutures.add(info.getActorFuture()); } @@ -395,6 +392,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private interface MessageSupplier { Object newMessage(TransactionIdentifier transactionId, short version); + boolean isSerializedReplyType(Object reply); } }