X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=cdd57df0006c2ae848acce67b3d8a56373ebe27a;hb=2dedb8231e13abe55d6b75eb532d23dbe536e168;hp=4d80d7fd8ab3fb3c5200f62aa4c589a6343ad2b7;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 4d80d7fd8a..cdd57df000 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -5,14 +5,13 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -22,6 +21,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -29,7 +29,9 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.yangtools.yang.common.Empty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -65,24 +67,24 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } }; - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final List cohorts; - private final SettableFuture cohortsResolvedFuture = SettableFuture.create(); + private final SettableFuture cohortsResolvedFuture = SettableFuture.create(); private final TransactionIdentifier transactionId; private volatile OperationCallback commitOperationCallback; - public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List cohorts, + public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List cohorts, final TransactionIdentifier transactionId) { - this.actorContext = actorContext; + this.actorUtils = actorUtils; this.cohorts = cohorts; - this.transactionId = Preconditions.checkNotNull(transactionId); + this.transactionId = requireNonNull(transactionId); if (cohorts.isEmpty()) { - cohortsResolvedFuture.set(null); + cohortsResolvedFuture.set(Empty.value()); } } - private ListenableFuture resolveCohorts() { + private ListenableFuture resolveCohorts() { if (cohortsResolvedFuture.isDone()) { return cohortsResolvedFuture; } @@ -104,12 +106,12 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< info.setResolvedActor(actor); if (done) { LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId); - cohortsResolvedFuture.set(null); + cohortsResolvedFuture.set(Empty.value()); } } } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } return cohortsResolvedFuture; @@ -127,9 +129,9 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier // and passed to us from upstream processing. If any one fails then we'll fail canCommit. - Futures.addCallback(resolveCohorts(), new FutureCallback() { + Futures.addCallback(resolveCohorts(), new FutureCallback<>() { @Override - public void onSuccess(final Void notUsed) { + public void onSuccess(final Empty result) { finishCanCommit(returnFuture); } @@ -152,12 +154,12 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } - commitOperationCallback = new TransactionRateLimitingCallback(actorContext); + commitOperationCallback = new TransactionRateLimitingCallback(actorUtils); commitOperationCallback.run(); final Iterator iterator = cohorts.iterator(); - final OnComplete onComplete = new OnComplete() { + final OnComplete onComplete = new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -192,7 +194,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< sendCanCommitTransaction(iterator.next(), this); } else { LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); - returnFuture.set(Boolean.valueOf(result)); + returnFuture.set(result); } } @@ -206,55 +208,54 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); - Future future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(), - message.toSerializable(), actorContext.getTransactionCommitOperationTimeout()); - future.onComplete(onComplete, actorContext.getClientDispatcher()); + Future future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(), + message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } private Future> invokeCohorts(final MessageSupplier messageSupplier) { - List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); + List> futureList = new ArrayList<>(cohorts.size()); for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor()); - futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, - actorContext.getTransactionCommitOperationTimeout())); + futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message, + actorUtils.getTransactionCommitOperationTimeout())); } - return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); + return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher()); } @Override - public ListenableFuture preCommit() { - // We don't need to do anything here - preCommit is done atomically with the commit phase - // by the shard. - return IMMEDIATE_VOID_SUCCESS; + public ListenableFuture preCommit() { + // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard. + return IMMEDIATE_EMPTY_SUCCESS; } @Override - public ListenableFuture abort() { + public ListenableFuture abort() { // Note - we pass false for propagateException. In the front-end data broker, this method // is called when one of the 3 phases fails with an exception. We'd rather have that // original exception propagated to the client. If our abort fails and we propagate the // exception then that exception will supersede and suppress the original exception. But // it's the original exception that is the root cause and of more interest to the client. - return voidOperation("abort", ABORT_MESSAGE_SUPPLIER, - AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK); + return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false, + OperationCallback.NO_OP_CALLBACK); } @Override - public ListenableFuture commit() { + public ListenableFuture commit() { OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback : OperationCallback.NO_OP_CALLBACK; - return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER, - CommitTransactionReply.class, true, operationCallback); + return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true, + operationCallback); } @SuppressWarnings("checkstyle:IllegalCatch") - private static boolean successfulFuture(final ListenableFuture future) { + private static boolean successfulFuture(final ListenableFuture future) { if (!future.isDone()) { return false; } @@ -267,26 +268,26 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } } - private ListenableFuture voidOperation(final String operationName, + private ListenableFuture operation(final String operationName, final T futureValue, final MessageSupplier messageSupplier, final Class expectedResponseClass, final boolean propagateException, final OperationCallback callback) { LOG.debug("Tx {} {}", transactionId, operationName); - final SettableFuture returnFuture = SettableFuture.create(); + final SettableFuture returnFuture = SettableFuture.create(); // The cohort actor list should already be built at this point by the canCommit phase but, // if not for some reason, we'll try to build it here. - ListenableFuture future = resolveCohorts(); + ListenableFuture future = resolveCohorts(); if (successfulFuture(future)) { - finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, - returnFuture, callback); + finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, + futureValue, callback); } else { - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(final Void notUsed) { - finishVoidOperation(operationName, messageSupplier, expectedResponseClass, - propagateException, returnFuture, callback); + public void onSuccess(final Empty result) { + finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, + returnFuture, futureValue, callback); } @Override @@ -296,7 +297,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< if (propagateException) { returnFuture.setException(failure); } else { - returnFuture.set(null); + returnFuture.set(futureValue); } } }, MoreExecutors.directExecutor()); @@ -305,9 +306,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return returnFuture; } - private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier, + private void finishOperation(final String operationName, final MessageSupplier messageSupplier, final Class expectedResponseClass, final boolean propagateException, - final SettableFuture returnFuture, final OperationCallback callback) { + final SettableFuture returnFuture, final T futureValue, + final OperationCallback callback) { LOG.debug("Tx {} finish {}", transactionId, operationName); callback.resume(); @@ -338,19 +340,19 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // Since the caller doesn't want us to propagate the exception we'll also // not log it normally. But it's usually not good to totally silence // exceptions so we'll log it to debug level. - returnFuture.set(null); + returnFuture.set(futureValue); } callback.failure(); } else { LOG.debug("Tx {}: {} succeeded", transactionId, operationName); - returnFuture.set(null); + returnFuture.set(futureValue); callback.success(); } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } @Override @@ -365,9 +367,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< static class CohortInfo { private final Future actorFuture; - private volatile ActorSelection resolvedActor; private final Supplier actorVersionSupplier; + private volatile ActorSelection resolvedActor; + CohortInfo(final Future actorFuture, final Supplier actorVersionSupplier) { this.actorFuture = actorFuture; this.actorVersionSupplier = actorVersionSupplier; @@ -386,8 +389,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } short getActorVersion() { - Preconditions.checkState(resolvedActor != null, - "getActorVersion cannot be called until the actor is resolved"); + checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved"); return actorVersionSupplier.get(); } }