X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=fdcd30602b8d17429c0bc29623f5efb917cef41e;hp=8f6271605727abba421dafd8bb9dee3a070bdd0c;hb=b5cb353e3553a39f576c284119af75ffa5ea66a9;hpb=b712eb01354ddb5878008e2a2e8f03fb19b92555 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 8f62716057..fdcd30602b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Iterator; @@ -42,24 +43,24 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() { @Override - public Object newMessage(TransactionIdentifier transactionId, short version) { + public Object newMessage(final TransactionIdentifier transactionId, final short version) { return new CommitTransaction(transactionId, version).toSerializable(); } @Override - public boolean isSerializedReplyType(Object reply) { + public boolean isSerializedReplyType(final Object reply) { return CommitTransactionReply.isSerializedType(reply); } }; private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() { @Override - public Object newMessage(TransactionIdentifier transactionId, short version) { + public Object newMessage(final TransactionIdentifier transactionId, final short version) { return new AbortTransaction(transactionId, version).toSerializable(); } @Override - public boolean isSerializedReplyType(Object reply) { + public boolean isSerializedReplyType(final Object reply) { return AbortTransactionReply.isSerializedType(reply); } }; @@ -70,8 +71,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private final TransactionIdentifier transactionId; private volatile OperationCallback commitOperationCallback; - public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohorts, - TransactionIdentifier transactionId) { + public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List cohorts, + final TransactionIdentifier transactionId) { this.actorContext = actorContext; this.cohorts = cohorts; this.transactionId = Preconditions.checkNotNull(transactionId); @@ -91,7 +92,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< for (final CohortInfo info: cohorts) { info.getActorFuture().onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection actor) { + public void onComplete(final Throwable failure, final ActorSelection actor) { synchronized (lock) { boolean done = completed.decrementAndGet() == 0; if (failure != null) { @@ -128,15 +129,15 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< Futures.addCallback(resolveCohorts(), new FutureCallback() { @Override - public void onSuccess(Void notUsed) { + public void onSuccess(final Void notUsed) { finishCanCommit(returnFuture); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { returnFuture.setException(failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -158,7 +159,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure); @@ -200,7 +201,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< sendCanCommitTransaction(iterator.next(), onComplete); } - private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete onComplete) { + private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete onComplete) { CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion()); LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); @@ -210,7 +211,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< future.onComplete(onComplete, actorContext.getClientDispatcher()); } - private Future> invokeCohorts(MessageSupplier messageSupplier) { + private Future> invokeCohorts(final MessageSupplier messageSupplier) { List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); @@ -253,7 +254,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } @SuppressWarnings("checkstyle:IllegalCatch") - private static boolean successfulFuture(ListenableFuture future) { + private static boolean successfulFuture(final ListenableFuture future) { if (!future.isDone()) { return false; } @@ -283,13 +284,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } else { Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Void notUsed) { + public void onSuccess(final Void notUsed) { finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, callback); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); if (propagateException) { @@ -298,13 +299,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< returnFuture.set(null); } } - }); + }, MoreExecutors.directExecutor()); } return returnFuture; } - private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier, + private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier, final Class expectedResponseClass, final boolean propagateException, final SettableFuture returnFuture, final OperationCallback callback) { LOG.debug("Tx {} finish {}", transactionId, operationName); @@ -315,7 +316,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< combinedFuture.onComplete(new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable responses) throws Throwable { + public void onComplete(final Throwable failure, final Iterable responses) throws Throwable { Throwable exceptionToPropagate = failure; if (exceptionToPropagate == null) { for (Object response: responses) { @@ -367,7 +368,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private volatile ActorSelection resolvedActor; private final Supplier actorVersionSupplier; - CohortInfo(Future actorFuture, Supplier actorVersionSupplier) { + CohortInfo(final Future actorFuture, final Supplier actorVersionSupplier) { this.actorFuture = actorFuture; this.actorVersionSupplier = actorVersionSupplier; } @@ -380,7 +381,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return resolvedActor; } - void setResolvedActor(ActorSelection resolvedActor) { + void setResolvedActor(final ActorSelection resolvedActor) { this.resolvedActor = resolvedActor; }