X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=cd79ad6f19ed32fdb37a6b8204e1de0a47ee2ccd;hp=8d85bdcb666a24755a567db6fffa7d18c1fb9feb;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hpb=600e07c2adb213b614cad127070f1e7ff074b42b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 8d85bdcb66..cd79ad6f19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -5,35 +5,37 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; /** - * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies + * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies. */ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort { @@ -41,71 +43,73 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() { @Override - public Object newMessage(String transactionId, short version) { + public Object newMessage(final TransactionIdentifier transactionId, final short version) { return new CommitTransaction(transactionId, version).toSerializable(); } @Override - public boolean isSerializedReplyType(Object reply) { + public boolean isSerializedReplyType(final Object reply) { return CommitTransactionReply.isSerializedType(reply); } }; private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() { @Override - public Object newMessage(String transactionId, short version) { + public Object newMessage(final TransactionIdentifier transactionId, final short version) { return new AbortTransaction(transactionId, version).toSerializable(); } @Override - public boolean isSerializedReplyType(Object reply) { + public boolean isSerializedReplyType(final Object reply) { return AbortTransactionReply.isSerializedType(reply); } }; - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final List cohorts; private final SettableFuture cohortsResolvedFuture = SettableFuture.create(); - private final String transactionId; + private final TransactionIdentifier transactionId; private volatile OperationCallback commitOperationCallback; - public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohorts, String transactionId) { - this.actorContext = actorContext; + public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List cohorts, + final TransactionIdentifier transactionId) { + this.actorUtils = actorUtils; this.cohorts = cohorts; - this.transactionId = transactionId; + this.transactionId = requireNonNull(transactionId); - if(cohorts.isEmpty()) { + if (cohorts.isEmpty()) { cohortsResolvedFuture.set(null); } } private ListenableFuture resolveCohorts() { - if(cohortsResolvedFuture.isDone()) { + if (cohortsResolvedFuture.isDone()) { return cohortsResolvedFuture; } final AtomicInteger completed = new AtomicInteger(cohorts.size()); - for(final CohortInfo info: cohorts) { + final Object lock = new Object(); + for (final CohortInfo info: cohorts) { info.getActorFuture().onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection actor) { - synchronized(completed) { + public void onComplete(final Throwable failure, final ActorSelection actor) { + synchronized (lock) { boolean done = completed.decrementAndGet() == 0; - if(failure != null) { + if (failure != null) { LOG.debug("Tx {}: a cohort Future failed", transactionId, failure); cohortsResolvedFuture.setException(failure); - } else if(!cohortsResolvedFuture.isDone()) { + } else if (!cohortsResolvedFuture.isDone()) { LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor); info.setResolvedActor(actor); - if(done) { + if (done) { LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId); cohortsResolvedFuture.set(null); } } } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } return cohortsResolvedFuture; @@ -125,15 +129,15 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< Futures.addCallback(resolveCohorts(), new FutureCallback() { @Override - public void onSuccess(Void notUsed) { + public void onSuccess(final Void notUsed) { finishCanCommit(returnFuture); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { returnFuture.setException(failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -142,20 +146,20 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {} finishCanCommit", transactionId); // For empty transactions return immediately - if(cohorts.size() == 0){ + if (cohorts.size() == 0) { LOG.debug("Tx {}: canCommit returning result true", transactionId); returnFuture.set(Boolean.TRUE); return; } - commitOperationCallback = new TransactionRateLimitingCallback(actorContext); + commitOperationCallback = new TransactionRateLimitingCallback(actorUtils); commitOperationCallback.run(); final Iterator iterator = cohorts.iterator(); - final OnComplete onComplete = new OnComplete() { + final OnComplete onComplete = new OnComplete<>() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure); @@ -184,11 +188,11 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } - if(iterator.hasNext() && result) { + if (iterator.hasNext() && result) { sendCanCommitTransaction(iterator.next(), this); } else { LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); - returnFuture.set(Boolean.valueOf(result)); + returnFuture.set(result); } } @@ -197,32 +201,28 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< sendCanCommitTransaction(iterator.next(), onComplete); } - private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete onComplete) { + private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete onComplete) { CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); - } + LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); - Future future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(), - message.toSerializable(), actorContext.getTransactionCommitOperationTimeout()); - future.onComplete(onComplete, actorContext.getClientDispatcher()); + Future future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(), + message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } - private Future> invokeCohorts(MessageSupplier messageSupplier) { - List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); - for(CohortInfo cohort : cohorts) { + private Future> invokeCohorts(final MessageSupplier messageSupplier) { + List> futureList = new ArrayList<>(cohorts.size()); + for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort); - } + LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor()); - futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, - actorContext.getTransactionCommitOperationTimeout())); + futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message, + actorUtils.getTransactionCommitOperationTimeout())); } - return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); + return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher()); } @Override @@ -253,15 +253,16 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< CommitTransactionReply.class, true, operationCallback); } - private static boolean successfulFuture(ListenableFuture future) { - if(!future.isDone()) { + @SuppressWarnings("checkstyle:IllegalCatch") + private static boolean successfulFuture(final ListenableFuture future) { + if (!future.isDone()) { return false; } try { future.get(); return true; - } catch(Exception e) { + } catch (Exception e) { return false; } } @@ -277,34 +278,34 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // if not for some reason, we'll try to build it here. ListenableFuture future = resolveCohorts(); - if(successfulFuture(future)) { + if (successfulFuture(future)) { finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, callback); } else { Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Void notUsed) { + public void onSuccess(final Void notUsed) { finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, callback); } @Override - public void onFailure(Throwable failure) { - LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); + public void onFailure(final Throwable failure) { + LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure); - if(propagateException) { + if (propagateException) { returnFuture.setException(failure); } else { returnFuture.set(null); } } - }); + }, MoreExecutors.directExecutor()); } return returnFuture; } - private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier, + private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier, final Class expectedResponseClass, final boolean propagateException, final SettableFuture returnFuture, final OperationCallback callback) { LOG.debug("Tx {} finish {}", transactionId, operationName); @@ -315,11 +316,11 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< combinedFuture.onComplete(new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable responses) throws Throwable { + public void onComplete(final Throwable failure, final Iterable responses) { Throwable exceptionToPropagate = failure; - if(exceptionToPropagate == null) { - for(Object response: responses) { - if(!response.getClass().equals(expectedResponseClass)) { + if (exceptionToPropagate == null) { + for (Object response: responses) { + if (!response.getClass().equals(expectedResponseClass)) { exceptionToPropagate = new IllegalArgumentException( String.format("Unexpected response type %s", response.getClass())); break; @@ -327,9 +328,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } } - if(exceptionToPropagate != null) { + if (exceptionToPropagate != null) { LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate); - if(propagateException) { + if (propagateException) { // We don't log the exception here to avoid redundant logging since we're // propagating to the caller in MD-SAL core who will log it. returnFuture.setException(exceptionToPropagate); @@ -349,13 +350,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< callback.success(); } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } @Override List> getCohortFutures() { List> cohortFutures = new ArrayList<>(cohorts.size()); - for(CohortInfo info: cohorts) { + for (CohortInfo info: cohorts) { cohortFutures.add(info.getActorFuture()); } @@ -364,10 +365,11 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< static class CohortInfo { private final Future actorFuture; - private volatile ActorSelection resolvedActor; private final Supplier actorVersionSupplier; - CohortInfo(Future actorFuture, Supplier actorVersionSupplier) { + private volatile ActorSelection resolvedActor; + + CohortInfo(final Future actorFuture, final Supplier actorVersionSupplier) { this.actorFuture = actorFuture; this.actorVersionSupplier = actorVersionSupplier; } @@ -380,19 +382,19 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return resolvedActor; } - void setResolvedActor(ActorSelection resolvedActor) { + void setResolvedActor(final ActorSelection resolvedActor) { this.resolvedActor = resolvedActor; } short getActorVersion() { - Preconditions.checkState(resolvedActor != null, - "getActorVersion cannot be called until the actor is resolved"); + checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved"); return actorVersionSupplier.get(); } } private interface MessageSupplier { - Object newMessage(String transactionId, short version); + Object newMessage(TransactionIdentifier transactionId, short version); + boolean isSerializedReplyType(Object reply); } }