From: Tom Pantelis Date: Fri, 22 Jan 2016 19:03:04 +0000 (-0500) Subject: Handle 3PC message backwards compatibility X-Git-Tag: release/boron~410 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=0281535ab08fd795e42df66d25e9a904ff941ad7 Handle 3PC message backwards compatibility Modified the ThreePhaseCommitCohortProxy to send the appropriate 3PC messages based on the backend cohort actor's version. The version is supplied via a new CohortInfo class which also holds the existing cohort ActorSelection Future. The caller actually specifies a Supplier to obtain the version b/c the version may not be known until the ActorSelection Future is known. The TransactionProxy passes a Supplier which obtains the version from the TransactionContext. As a result, the ThreePhaseCommitCohortProxy was refactored a bit to handle the CohortInfo instances. The ThreePhaseCommitCohortProxyTest was refactored to use real actors for the cohorts instead of mocking messages via a mocked ActorContext. Modified the ShardCommitCoordinator to return the appropriate versioned replies. Change-Id: I13e6acf81c1fe8abda43c30b1a73648e411ab500 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java index 97a0205ff2..7b25abbe83 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java @@ -17,9 +17,16 @@ abstract class AbstractTransactionContext implements TransactionContext { private final TransactionIdentifier transactionIdentifier; private long modificationCount = 0; private boolean handOffComplete; + private final short transactionVersion; protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) { + this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION); + } + + protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier, + short transactionVersion) { this.transactionIdentifier = transactionIdentifier; + this.transactionVersion = transactionVersion; } /** @@ -52,4 +59,9 @@ abstract class AbstractTransactionContext implements TransactionContext { public boolean usesOperationLimiting() { return false; } + + @Override + public short getTransactionVersion() { + return transactionVersion; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index d0bef0000e..67916cf1d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -36,7 +36,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private final ActorContext actorContext; private final ActorSelection actor; - private final short remoteTransactionVersion; private final OperationLimiter limiter; private BatchedModifications batchedModifications; @@ -44,11 +43,10 @@ public class RemoteTransactionContext extends AbstractTransactionContext { protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) { - super(identifier); + super(identifier, remoteTransactionVersion); this.limiter = Preconditions.checkNotNull(limiter); this.actor = actor; this.actorContext = actorContext; - this.remoteTransactionVersion = remoteTransactionVersion; } private Future completeOperation(Future operationFuture){ @@ -64,10 +62,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return actorContext; } - protected short getRemoteTransactionVersion() { - return remoteTransactionVersion; - } - protected Future executeOperationAsync(SerializableMessage msg) { return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable())); } @@ -77,7 +71,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); - actorContext.sendOperationAsync(getActor(), new CloseTransaction(remoteTransactionVersion).toSerializable()); + actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); } @Override @@ -115,7 +109,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } private BatchedModifications newBatchedModifications() { - return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId()); + return new BatchedModifications(getIdentifier().toString(), getTransactionVersion(), + getIdentifier().getChainId()); } private void batchModification(Modification modification) { @@ -206,7 +201,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } }; - Future future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion)); + Future future = executeOperationAsync(readCmd.asVersion(getTransactionVersion())); future.onComplete(onComplete, actorContext.getClientDispatcher()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 18be295dbb..74e25323bc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -347,7 +347,7 @@ public class Shard extends RaftActor { try { cohortEntry.commit(); - sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf()); + sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -384,7 +384,8 @@ public class Shard extends RaftActor { LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); } - sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf()); + sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), + getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 0c6012b9a2..51d8d5caec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -135,7 +135,7 @@ class ShardCommitCoordinator { ready.getTransactionID(), ready.getTxnClientVersion()); ShardDataTreeCohort cohort = ready.getTransaction().ready(); - CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort); + CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion()); cohortCache.put(ready.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -169,7 +169,7 @@ class ShardCommitCoordinator { if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), - batched.getTransactionChainID())); + batched.getTransactionChainID()), batched.getVersion()); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -227,7 +227,8 @@ class ShardCommitCoordinator { void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), message.getTransactionID()); - final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort); + final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, + DataStoreVersions.CURRENT_VERSION); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -321,8 +322,9 @@ class ShardCommitCoordinator { } else { // FIXME - use caller's version cohortEntry.getReplySender().tell( - canCommit ? CanCommitTransactionReply.yes(DataStoreVersions.CURRENT_VERSION).toSerializable() : - CanCommitTransactionReply.no(DataStoreVersions.CURRENT_VERSION).toSerializable(), cohortEntry.getShard().self()); + canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() : + CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(), + cohortEntry.getShard().self()); } } catch (Exception e) { log.debug("{}: An exception occurred during canCommit", name, e); @@ -421,7 +423,7 @@ class ShardCommitCoordinator { shard.getShardMBean().incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } } catch (Exception e) { log.error("{}: An exception happened during abort", name, e); @@ -587,16 +589,19 @@ class ShardCommitCoordinator { private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private int totalBatchedModificationsReceived; private boolean aborted; + private final short clientVersion; - CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; + this.clientVersion = clientVersion; } - CohortEntry(String transactionID, ShardDataTreeCohort cohort) { + CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) { this.transactionID = transactionID; this.cohort = cohort; this.transaction = null; + this.clientVersion = clientVersion; } void updateLastAccessTime() { @@ -608,6 +613,10 @@ class ShardCommitCoordinator { return transactionID; } + short getClientVersion() { + return clientVersion; + } + DataTreeCandidate getCandidate() { return cohort.getCandidate(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java index 0823c902ae..9c17bc1a47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java @@ -21,8 +21,7 @@ import scala.concurrent.Future; /** * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the - * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a - * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions). + * shard as an optimization. * * @author Thomas Pantelis */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 1e59f0d097..9f94731b59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -9,14 +9,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -27,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.runtime.AbstractFunction1; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies @@ -36,42 +39,82 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class); + private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() { + @Override + public Object newMessage(String transactionId, short version) { + return new CommitTransaction(transactionId, version).toSerializable(); + } + + @Override + public boolean isSerializedReplyType(Object reply) { + return CommitTransactionReply.isSerializedType(reply); + } + }; + + private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() { + @Override + public Object newMessage(String transactionId, short version) { + return new AbortTransaction(transactionId, version).toSerializable(); + } + + @Override + public boolean isSerializedReplyType(Object reply) { + return AbortTransactionReply.isSerializedType(reply); + } + }; + private final ActorContext actorContext; - private final List> cohortFutures; - private volatile List cohorts; + private final List cohorts; + private final SettableFuture cohortsResolvedFuture = SettableFuture.create(); private final String transactionId; private volatile OperationCallback commitOperationCallback; - public ThreePhaseCommitCohortProxy(ActorContext actorContext, - List> cohortFutures, String transactionId) { + public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohorts, String transactionId) { this.actorContext = actorContext; - this.cohortFutures = cohortFutures; + this.cohorts = cohorts; this.transactionId = transactionId; - } - private Future buildCohortList() { + if(cohorts.isEmpty()) { + cohortsResolvedFuture.set(null); + } + } - Future> combinedFutures = Futures.sequence(cohortFutures, - actorContext.getClientDispatcher()); + private ListenableFuture resolveCohorts() { + if(cohortsResolvedFuture.isDone()) { + return cohortsResolvedFuture; + } - return combinedFutures.transform(new AbstractFunction1, Void>() { - @Override - public Void apply(Iterable actorSelections) { - cohorts = Lists.newArrayList(actorSelections); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} successfully built cohort path list: {}", - transactionId, cohorts); + final AtomicInteger completed = new AtomicInteger(cohorts.size()); + for(final CohortInfo info: cohorts) { + info.getActorFuture().onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection actor) { + synchronized(completed) { + boolean done = completed.decrementAndGet() == 0; + if(failure != null) { + LOG.debug("Tx {}: a cohort Future failed", transactionId, failure); + cohortsResolvedFuture.setException(failure); + } else if(!cohortsResolvedFuture.isDone()) { + LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor); + + info.setResolvedActor(actor); + if(done) { + LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId); + cohortsResolvedFuture.set(null); + } + } + } } - return null; - } - }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); + }, actorContext.getClientDispatcher()); + } + + return cohortsResolvedFuture; } @Override public ListenableFuture canCommit() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} canCommit", transactionId); - } + LOG.debug("Tx {} canCommit", transactionId); + final SettableFuture returnFuture = SettableFuture.create(); // The first phase of canCommit is to gather the list of cohort actor paths that will @@ -80,53 +123,42 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier // and passed to us from upstream processing. If any one fails then we'll fail canCommit. - buildCohortList().onComplete(new OnComplete() { + Futures.addCallback(resolveCohorts(), new FutureCallback() { @Override - public void onComplete(Throwable failure, Void notUsed) throws Throwable { - if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure); - } - returnFuture.setException(failure); - } else { - finishCanCommit(returnFuture); - } + public void onSuccess(Void notUsed) { + finishCanCommit(returnFuture); } - }, actorContext.getClientDispatcher()); + + @Override + public void onFailure(Throwable failure) { + returnFuture.setException(failure); + } + }); return returnFuture; } private void finishCanCommit(final SettableFuture returnFuture) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} finishCanCommit", transactionId); - } + LOG.debug("Tx {} finishCanCommit", transactionId); // For empty transactions return immediately if(cohorts.size() == 0){ - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true); - } + LOG.debug("Tx {}: canCommit returning result true", transactionId); returnFuture.set(Boolean.TRUE); return; } - commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK : - new TransactionRateLimitingCallback(actorContext); - + commitOperationCallback = new TransactionRateLimitingCallback(actorContext); commitOperationCallback.run(); - final Object message = new CanCommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(); - - final Iterator iterator = cohorts.iterator(); + final Iterator iterator = cohorts.iterator(); final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) throws Throwable { + public void onComplete(Throwable failure, Object response) { if (failure != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); - } + LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure); + returnFuture.setException(failure); commitOperationCallback.failure(); return; @@ -138,8 +170,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< boolean result = true; if (CanCommitTransactionReply.isSerializedType(response)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); + CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response); + + LOG.debug("Tx {}: received {}", transactionId, response); + if (!reply.getCanCommit()) { result = false; } @@ -150,35 +184,45 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } - if(iterator.hasNext() && result){ - Future future = actorContext.executeOperationAsync(iterator.next(), message, - actorContext.getTransactionCommitOperationTimeout()); - future.onComplete(this, actorContext.getClientDispatcher()); + if(iterator.hasNext() && result) { + sendCanCommitTransaction(iterator.next(), this); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); - } + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); returnFuture.set(Boolean.valueOf(result)); } } }; - Future future = actorContext.executeOperationAsync(iterator.next(), message, - actorContext.getTransactionCommitOperationTimeout()); + sendCanCommitTransaction(iterator.next(), onComplete); + } + + private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete onComplete) { + CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion()); + + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor()); + } + + Future future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(), + message.toSerializable(), actorContext.getTransactionCommitOperationTimeout()); future.onComplete(onComplete, actorContext.getClientDispatcher()); } - private Future> invokeCohorts(Object message) { + private Future> invokeCohorts(MessageSupplier messageSupplier) { List> futureList = Lists.newArrayListWithCapacity(cohorts.size()); - for(ActorSelection cohort : cohorts) { + for(CohortInfo cohort : cohorts) { + Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); + if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort); + LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort); } - futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); + + futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, + actorContext.getTransactionCommitOperationTimeout())); } - return Futures.sequence(futureList, actorContext.getClientDispatcher()); + return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); } @Override @@ -196,8 +240,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // exception then that exception will supersede and suppress the original exception. But // it's the original exception that is the root cause and of more interest to the client. - return voidOperation("abort", new AbortTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(), - AbortTransactionReply.class, false); + return voidOperation("abort", ABORT_MESSAGE_SUPPLIER, + AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK); } @Override @@ -205,88 +249,86 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback : OperationCallback.NO_OP_CALLBACK; - return voidOperation("commit", new CommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(), + return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true, operationCallback); } - private ListenableFuture voidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException) { - return voidOperation(operationName, message, expectedResponseClass, propagateException, - OperationCallback.NO_OP_CALLBACK); + private boolean successfulFuture(ListenableFuture future) { + if(!future.isDone()) { + return false; + } + + try { + future.get(); + return true; + } catch(Exception e) { + return false; + } } - private ListenableFuture voidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException, final OperationCallback callback) { + private ListenableFuture voidOperation(final String operationName, + final MessageSupplier messageSupplier, final Class expectedResponseClass, + final boolean propagateException, final OperationCallback callback) { + LOG.debug("Tx {} {}", transactionId, operationName); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} {}", transactionId, operationName); - } final SettableFuture returnFuture = SettableFuture.create(); // The cohort actor list should already be built at this point by the canCommit phase but, // if not for some reason, we'll try to build it here. - if(cohorts != null) { - finishVoidOperation(operationName, message, expectedResponseClass, propagateException, + ListenableFuture future = resolveCohorts(); + if(successfulFuture(future)) { + finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture, callback); } else { - buildCohortList().onComplete(new OnComplete() { + Futures.addCallback(future, new FutureCallback() { @Override - public void onComplete(Throwable failure, Void notUsed) throws Throwable { - if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, - operationName, failure); - } - if(propagateException) { - returnFuture.setException(failure); - } else { - returnFuture.set(null); - } + public void onSuccess(Void notUsed) { + finishVoidOperation(operationName, messageSupplier, expectedResponseClass, + propagateException, returnFuture, callback); + } + + @Override + public void onFailure(Throwable failure) { + LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); + + if(propagateException) { + returnFuture.setException(failure); } else { - finishVoidOperation(operationName, message, expectedResponseClass, - propagateException, returnFuture, callback); + returnFuture.set(null); } } - }, actorContext.getClientDispatcher()); + }); } return returnFuture; } - private void finishVoidOperation(final String operationName, final Object message, + private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier, final Class expectedResponseClass, final boolean propagateException, final SettableFuture returnFuture, final OperationCallback callback) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} finish {}", transactionId, operationName); - } + LOG.debug("Tx {} finish {}", transactionId, operationName); callback.resume(); - Future> combinedFuture = invokeCohorts(message); + Future> combinedFuture = invokeCohorts(messageSupplier); combinedFuture.onComplete(new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable responses) throws Throwable { - Throwable exceptionToPropagate = failure; if(exceptionToPropagate == null) { for(Object response: responses) { if(!response.getClass().equals(expectedResponseClass)) { exceptionToPropagate = new IllegalArgumentException( - String.format("Unexpected response type %s", - response.getClass())); + String.format("Unexpected response type %s", response.getClass())); break; } } } if(exceptionToPropagate != null) { - - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId, - operationName, exceptionToPropagate); - } + LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate); if(propagateException) { // We don't log the exception here to avoid redundant logging since we're // propagating to the caller in MD-SAL core who will log it. @@ -295,19 +337,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< // Since the caller doesn't want us to propagate the exception we'll also // not log it normally. But it's usually not good to totally silence // exceptions so we'll log it to debug level. - if(LOG.isDebugEnabled()) { - LOG.debug(String.format("%s failed", message.getClass().getSimpleName()), - exceptionToPropagate); - } returnFuture.set(null); } callback.failure(); } else { + LOG.debug("Tx {}: {} succeeded", transactionId, operationName); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: {} succeeded", transactionId, operationName); - } returnFuture.set(null); callback.success(); @@ -318,6 +354,45 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< @Override List> getCohortFutures() { - return Collections.unmodifiableList(cohortFutures); + List> cohortFutures = new ArrayList<>(cohorts.size()); + for(CohortInfo info: cohorts) { + cohortFutures.add(info.getActorFuture()); + } + + return cohortFutures; + } + + static class CohortInfo { + private final Future actorFuture; + private volatile ActorSelection resolvedActor; + private final Supplier actorVersionSupplier; + + CohortInfo(Future actorFuture, Supplier actorVersionSupplier) { + this.actorFuture = actorFuture; + this.actorVersionSupplier = actorVersionSupplier; + } + + Future getActorFuture() { + return actorFuture; + } + + ActorSelection getResolvedActor() { + return resolvedActor; + } + + void setResolvedActor(ActorSelection resolvedActor) { + this.resolvedActor = resolvedActor; + } + + short getActorVersion() { + Preconditions.checkState(resolvedActor != null, + "getActorVersion cannot be called until the actor is resolved"); + return actorVersionSupplier.get(); + } + } + + private interface MessageSupplier { + Object newMessage(String transactionId, short version); + boolean isSerializedReplyType(Object reply); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index ab636ff493..ca03c3d60d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -45,4 +45,6 @@ interface TransactionContext { * @return */ boolean usesOperationLimiting(); + + short getTransactionVersion(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f645608dd9..5dcba758f6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; @@ -237,6 +238,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, final TransactionContextWrapper contextWrapper) { @@ -281,14 +283,27 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort( final Set> txContextWrapperEntries) { - final List> cohortFutures = new ArrayList<>(txContextWrapperEntries.size()); + final List cohorts = new ArrayList<>(txContextWrapperEntries.size()); for (Entry e : txContextWrapperEntries) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - cohortFutures.add(e.getValue().readyTransaction()); + final TransactionContextWrapper wrapper = e.getValue(); + + // The remote tx version is obtained the via TransactionContext which may not be available yet so + // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the + // TransactionContext is available. + Supplier txVersionSupplier = new Supplier() { + @Override + public Short get() { + return wrapper.getTransactionContext().getTransactionVersion(); + } + }; + + cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier)); } - return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, + getIdentifier().toString()); } private String shardNameFromIdentifier(final YangInstanceIdentifier path) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 2442548f13..17dec1911e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -25,6 +25,7 @@ import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; @@ -274,6 +275,15 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } } + protected ShardDataTreeCohort mockShardDataTreeCohort() { + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + doReturn(mockCandidate("candidate")).when(cohort).getCandidate(); + return cohort; + } + static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) { ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class); doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class)); @@ -420,7 +430,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue()); } - static DataTreeCandidateTip mockCandidate(final String name) { + public static DataTreeCandidateTip mockCandidate(final String name) { final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 499d5e22b9..55024b265a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -26,6 +26,7 @@ import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -539,6 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); } + @SuppressWarnings("unchecked") @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); @@ -592,8 +594,11 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -601,6 +606,7 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } + @SuppressWarnings("unchecked") @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); @@ -656,8 +662,11 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 0ab92dda89..e5db5cbbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -9,41 +9,46 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; -import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; import akka.dispatch.Futures; -import akka.util.Timeout; +import akka.testkit.TestActorRef; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.Lists; +import com.google.common.base.Supplier; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.stubbing.Stubber; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -51,12 +56,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { static class TestException extends RuntimeException { } - @Mock private ActorContext actorContext; - @Mock - private DatastoreContext datastoreContext; - @Mock private Timer commitTimer; @@ -66,15 +67,27 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @Mock private Snapshot commitSnapshot; + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + @Before public void setUp() { MockitoAnnotations.initMocks(this); - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); - doReturn(datastoreContext).when(actorContext).getDatastoreContext(); - doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); - doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); + actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(String operationName) { + return commitTimer; + } + + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + doReturn(commitTimerContext).when(commitTimer).time(); doReturn(commitSnapshot).when(commitTimer).getSnapshot(); for(int i=1;i<11;i++){ @@ -82,274 +95,353 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); } - doReturn(10.0).when(actorContext).getTxCreationLimit(); - } - - private Future newCohort() { - ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path(); - ActorSelection actorSelection = getSystem().actorSelection(path); - return Futures.successful(actorSelection); - } - - private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception { - List> cohortFutures = Lists.newArrayList(); - for(int i = 1; i <= nCohorts; i++) { - cohortFutures.add(newCohort()); - } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); } - private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath() - throws Exception { - List> cohortFutures = Lists.newArrayList(); - cohortFutures.add(newCohort()); - cohortFutures.add(Futures.failed(new TestException())); - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1"); - } - - private void setupMockActorContext(Class requestType, Object... responses) { - Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures - .failed((Throwable) responses[0]) : Futures - .successful(((SerializableMessage) responses[0]).toSerializable())); - - for(int i = 1; i < responses.length; i++) { - stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures - .failed((Throwable) responses[i]) : Futures - .successful(((SerializableMessage) responses[i]).toSerializable())); - } - - stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType), any(Timeout.class)); - - doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) - .when(actorContext).getTransactionCommitOperationTimeout(); - } - - private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType), any(Timeout.class)); - } - - private static void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { + @Test + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1"); - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch(ExecutionException e) { - throw e.getCause(); - } + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithOneCohort() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1"); - setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION)); - - future = proxy.canCommit(); - - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.class); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohorts() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); - - assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); - - verifyCohortInvocations(2, CanCommitTransaction.class); + public void testCanCommitYesWithTwoCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); } @Test - public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(3); - - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION), - CanCommitTransactionReply.yes(CURRENT_VERSION)); - - ListenableFuture future = proxy.canCommit(); - - Boolean actual = future.get(5, TimeUnit.SECONDS); - - assertEquals("canCommit", false, actual); - - verifyCohortInvocations(2, CanCommitTransaction.class); + public void testCanCommitNoWithThreeCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test(expected = TestException.class) public void testCanCommitWithExceptionFailure() throws Throwable { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1"); - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + propagateExecutionExceptionCause(proxy.canCommit()); + } - setupMockActorContext(CanCommitTransaction.class, new TestException()); + @Test(expected = IllegalArgumentException.class) + public void testCanCommitWithInvalidResponseType() throws Throwable { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1"); propagateExecutionExceptionCause(proxy.canCommit()); } - @Test(expected = ExecutionException.class) - public void testCanCommitWithInvalidResponseType() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + @Test(expected = TestException.class) + public void testCanCommitWithFailedCohortFuture() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1")), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); - setupMockActorContext(CanCommitTransaction.class, - new CommitTransactionReply()); + propagateExecutionExceptionCause(proxy.canCommit()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); + @Test + public void testAllThreePhasesSuccessful() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } @Test(expected = TestException.class) - public void testCanCommitWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - try { - propagateExecutionExceptionCause(proxy.canCommit()); - } finally { - verifyCohortInvocations(0, CanCommitTransaction.class); - } + public void testCommitWithExceptionFailure() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit(new TestException()))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); } - @Test - public void testPreCommit() throws Exception { - // Precommit is currently a no-op - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + @Test(expected = IllegalArgumentException.class) + public void testCommitWithInvalidResponseType() throws Throwable { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)). + expectCommit("invalid"))), "txn-1"); - proxy.preCommit().get(5, TimeUnit.SECONDS); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); } @Test public void testAbort() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.class, new AbortTransactionReply()); - - proxy.abort().get(5, TimeUnit.SECONDS); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1"); - verifyCohortInvocations(1, AbortTransaction.class); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test public void testAbortWithFailure() throws Exception { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); - - setupMockActorContext(AbortTransaction.class, new RuntimeException("mock")); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1"); // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(1, AbortTransaction.class); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testAbortWithFailedCohortPath() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); - - // The exception should not get propagated. - proxy.abort().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(0, AbortTransaction.class); + public void testAbortWithFailedCohortFuture() throws Throwable { + List cohorts = Arrays.asList( + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder("txn-1"))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(), - new CommitTransactionReply()); - - proxy.commit().get(5, TimeUnit.SECONDS); - - verifyCohortInvocations(2, CommitTransaction.class); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Collections.emptyList(), "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = TestException.class) - public void testCommitWithFailure() throws Throwable { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(), - new TestException()); - - propagateExecutionExceptionCause(proxy.commit()); + @Test + public void testBackwardsCompatibilityWithPreBoron() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder("txn-1"). + expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class, + CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)). + expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class, + CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)), + DataStoreVersions.LITHIUM_VERSION)); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); } - @Test(expected = ExecutionException.class) - public void testCommitWithInvalidResponseType() throws Exception { + private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { - ThreePhaseCommitCohortProxy proxy = setupProxy(1); + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch(ExecutionException e) { + verifyCohortActors(); + throw e.getCause(); + } + } - setupMockActorContext(CommitTransaction.class, new AbortTransactionReply()); + private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier() { + @Override + public Short get() { + return version; + } + }); + } - proxy.commit().get(5, TimeUnit.SECONDS); + private CohortInfo newCohortInfoWithFailedFuture(Exception failure) { + return new CohortInfo(Futures.failed(failure), new Supplier() { + @Override + public Short get() { + return CURRENT_VERSION; + } + }); } - @Test(expected = TestException.class) - public void testCommitWithFailedCohortPath() throws Throwable { + private CohortInfo newCohortInfo(CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); + } - ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath(); + private void verifyCohortActors() { + for(TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } + private T verifySuccessfulFuture(ListenableFuture future) throws Exception { try { - propagateExecutionExceptionCause(proxy.commit()); - } finally { - - verifyCohortInvocations(0, CommitTransaction.class); + return future.get(5, TimeUnit.SECONDS); + } catch(Exception e) { + verifyCohortActors(); + throw e; } - } - @Test - public void testAllThreePhasesSuccessful() throws Exception { - - ThreePhaseCommitCohortProxy proxy = setupProxy(2); - - setupMockActorContext(CanCommitTransaction.class, - CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION)); - - setupMockActorContext(CommitTransaction.class, - new CommitTransactionReply(), new CommitTransactionReply()); + private void verifyCanCommit(ListenableFuture future, boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + private static class CohortActor extends UntypedActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + private CohortActor(Builder builder) { + this.builder = builder; + } - verifyCohortInvocations(2, CanCommitTransaction.class); - verifyCohortInvocations(2, CommitTransaction.class); + @Override + public void onReceive(Object message) { + if(CanCommitTransaction.isSerializedType(message)) { + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + canCommitCount.incrementAndGet(); + } else if(CommitTransaction.isSerializedType(message)) { + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + commitCount.incrementAndGet(); + } else if(AbortTransaction.isSerializedType(message)) { + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + abortCount.incrementAndGet(); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } - } + private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage, + Class expType, Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID()); + + if(reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch(AssertionError e) { + assertionError = e; + } + } - @Test - public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + void verify() { + if(assertionError != null) { + throw assertionError; + } - ThreePhaseCommitCohortProxy proxy = setupProxy(0); + if(builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } - assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + if(builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } - proxy.canCommit().get(5, TimeUnit.SECONDS); - proxy.preCommit().get(5, TimeUnit.SECONDS); - proxy.commit().get(5, TimeUnit.SECONDS); + if(builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final String transactionId; + + Builder(String transactionId) { + this.transactionId = transactionId; + } + + Builder expectCanCommit(Class expCanCommitType, Object canCommitReply) { + this.expCanCommitType = expCanCommitType; + this.canCommitReply = canCommitReply; + return this; + } + + Builder expectCanCommit(Object canCommitReply) { + return expectCanCommit(CanCommitTransaction.class, canCommitReply); + } + + Builder expectCommit(Class expCommitType, Object commitReply) { + this.expCommitType = expCommitType; + this.commitReply = commitReply; + return this; + } + + Builder expectCommit(Object commitReply) { + return expectCommit(CommitTransaction.class, commitReply); + } + + Builder expectAbort(Class expAbortType, Object abortReply) { + this.expAbortType = expAbortType; + this.abortReply = abortReply; + return this; + } + + Builder expectAbort(Object abortReply) { + return expectAbort(AbortTransaction.class, abortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java index a6077e23e4..4499043b14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java @@ -7,18 +7,30 @@ */ package org.opendaylight.controller.cluster.datastore.compat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.TransactionType; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; /** * Shard unit tests for backwards compatibility with pre-Boron versions. @@ -47,4 +59,71 @@ public class PreBoronShardTest extends AbstractShardTest { path.contains("akka://test/user/testCreateTransaction/shard-txn-1")); }}; } + + @Test + public void testBatchedModificationsWithCommitOnReady() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsWithCommitOnReady"); + + waitUntilLeader(shard); + + final String transactionID = "tx"; + + BatchedModifications batched = new BatchedModifications(transactionID, + DataStoreVersions.LITHIUM_VERSION, ""); + batched.addModification(new WriteModification(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + batched.setReady(true); + batched.setDoCommitOnReady(true); + batched.setTotalMessagesSent(1); + + shard.tell(batched, getRef()); + expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + }}; + } + + @Test + public void testImmediateCommitWithForwardedReadyTransaction() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testImmediateCommitWithForwardedReadyTransaction"); + + waitUntilLeader(shard); + + final String transactionID = "tx"; + + shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID, + DataStoreVersions.LITHIUM_VERSION, true), getRef()); + + expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + }}; + } + + @Test + public void testThreePhaseCommitOnForwardedReadyTransaction() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testThreePhaseCommitOnForwardedReadyTransaction"); + + waitUntilLeader(shard); + + final String transactionID = "tx"; + + shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID, + DataStoreVersions.LITHIUM_VERSION, false), getRef()); + expectMsgClass(ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef()); + expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + }}; + } }