X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=5f82584f856cc69b987c473178232d3aea6b3a66;hp=90dcec238aa13f302d429aa1b2e38c7d78b81d67;hb=987e2e706d0b343304142626bc870f3e8c909b51;hpb=731e7284cf0895fdb1b89427f91762e80e67c2ff diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 90dcec238a..5f82584f85 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -5,23 +5,24 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -29,7 +30,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -65,17 +66,17 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } }; - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final List cohorts; private final SettableFuture cohortsResolvedFuture = SettableFuture.create(); private final TransactionIdentifier transactionId; private volatile OperationCallback commitOperationCallback; - public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List cohorts, + public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List cohorts, final TransactionIdentifier transactionId) { - this.actorContext = actorContext; + this.actorUtils = actorUtils; this.cohorts = cohorts; - this.transactionId = Preconditions.checkNotNull(transactionId); + this.transactionId = requireNonNull(transactionId); if (cohorts.isEmpty()) { cohortsResolvedFuture.set(null); @@ -109,7 +110,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } return cohortsResolvedFuture; @@ -142,6 +143,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return returnFuture; } + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") private void finishCanCommit(final SettableFuture returnFuture) { LOG.debug("Tx {} finishCanCommit", transactionId); @@ -152,7 +155,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } - commitOperationCallback = new TransactionRateLimitingCallback(actorContext); + commitOperationCallback = new TransactionRateLimitingCallback(actorUtils); commitOperationCallback.run(); final Iterator iterator = cohorts.iterator(); @@ -206,23 +209,23 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); - Future future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(), - message.toSerializable(), actorContext.getTransactionCommitOperationTimeout()); - future.onComplete(onComplete, actorContext.getClientDispatcher()); + Future future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(), + message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } private Future> invokeCohorts(final MessageSupplier messageSupplier) { - List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); + List> futureList = new ArrayList<>(cohorts.size()); for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor()); - futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, - actorContext.getTransactionCommitOperationTimeout())); + futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message, + actorUtils.getTransactionCommitOperationTimeout())); } - return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); + return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher()); } @Override @@ -291,7 +294,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< @Override public void onFailure(final Throwable failure) { - LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); + LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure); if (propagateException) { returnFuture.setException(failure); @@ -316,7 +319,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< combinedFuture.onComplete(new OnComplete>() { @Override - public void onComplete(final Throwable failure, final Iterable responses) throws Throwable { + public void onComplete(final Throwable failure, final Iterable responses) { Throwable exceptionToPropagate = failure; if (exceptionToPropagate == null) { for (Object response: responses) { @@ -350,7 +353,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< callback.success(); } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } @Override @@ -365,9 +368,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< static class CohortInfo { private final Future actorFuture; - private volatile ActorSelection resolvedActor; private final Supplier actorVersionSupplier; + private volatile ActorSelection resolvedActor; + CohortInfo(final Future actorFuture, final Supplier actorVersionSupplier) { this.actorFuture = actorFuture; this.actorVersionSupplier = actorVersionSupplier; @@ -386,8 +390,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } short getActorVersion() { - Preconditions.checkState(resolvedActor != null, - "getActorVersion cannot be called until the actor is resolved"); + checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved"); return actorVersionSupplier.get(); } }