From c6c9b43923bbe8bc6d586ce09649324949e6b092 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 17 Apr 2015 18:53:21 -0400 Subject: [PATCH] Elide front-end 3PC for single-shard Tx A new method, directCommit, was added to TransactionContext. In TransactionContextImpl#directCommit, it sends the final ready BatchedModifications message as before but with a new flag, doCommitOnReady, set to true. In ShardCommitCoordinator, if doCommitOnReady is true, it immediately initiates the canCommit phase without waiting for the CanCommitTransaction message. In ShardWriteTransaction, if doCommitOnReady is true, it sets a new similar flag in the ForwardedReadyTransaction message. Similarly, the ShardCommitCoordinator immediately initiates the canCommit phase. This code path is executed for read-write transactions as they still utilize the transaction actor. In TransactionProxy, when only 1 shard was accessed, it creates a SingleCommitCohortProxy instance. This class also implements DOMStoreThreePhaseCommitCohort but the 3 phases are essentially a no-op with direct commit. The canCommit phase simply waits for the direct commit Future to complete and returns success (true). If a failure occurs from the Shard, the exception is propagated to the caller. For backwards compatibility, we still need the ThreePhaseCommitCohortProxy even with a single-shard transaction. A complication with this is that TransactionProxy#ready may not know immediately if it can do direct commit or not because it may not have the TransactionContext instance yet due to the async nature of things. So in either case it still creates a SingleCommitCohortProxy. When it gets the callback to finally complete the operation, it checks the TransactionContext to see if it supports direct commit (only pre-Lithium versions won't). If supported, it calls directCommit, otherwise readyTransaction. In the SingleCommitCohortProxy, on successful completion of the ready/direct commit Future, it checks the response type. If it's an ActorSelection then it needs to do 3-phase commit so it creates and delegates to a ThreePhaseCommitCohortProxy. I moved the code in Shard#handleCommitTransaction to the ShardCommitCoordinator as this is also needed for direct commit. I also moved the handleForwardedReadyTransaction code into ShardCommitCoordinator to make it easier to handle direct commit. Change-Id: I40b04dd5abd24c78709598a5acfc16484e165427 Signed-off-by: Tom Pantelis --- .../AbstractThreePhaseCommitCohort.java | 10 +- .../datastore/ChainedTransactionProxy.java | 21 +- .../cluster/datastore/DatastoreContext.java | 2 +- .../NoOpDOMStoreThreePhaseCommitCohort.java | 9 +- .../datastore/NoOpTransactionContext.java | 14 +- .../cluster/datastore/OperationCallback.java | 26 +- .../controller/cluster/datastore/Shard.java | 107 +---- .../datastore/ShardCommitCoordinator.java | 251 ++++++++--- .../datastore/ShardWriteTransaction.java | 11 +- .../datastore/SingleCommitCohortProxy.java | 122 +++++ .../ThreePhaseCommitCohortProxy.java | 25 +- .../datastore/TransactionChainProxy.java | 7 +- .../cluster/datastore/TransactionContext.java | 4 + .../datastore/TransactionContextImpl.java | 21 +- .../cluster/datastore/TransactionProxy.java | 52 ++- .../PreLithiumTransactionContextImpl.java | 10 + .../messages/BatchedModifications.java | 11 + .../messages/ForwardedReadyTransaction.java | 8 +- .../cluster/datastore/AbstractShardTest.java | 6 +- .../AbstractTransactionProxyTest.java | 111 ++++- .../DistributedDataStoreIntegrationTest.java | 10 +- .../cluster/datastore/ShardTest.java | 415 +++++++++++++++--- .../datastore/ShardTransactionTest.java | 32 +- .../datastore/TransactionProxyTest.java | 108 ++--- .../datastore/compat/PreLithiumShardTest.java | 6 +- .../PreLithiumTransactionProxyTest.java | 58 ++- 26 files changed, 1084 insertions(+), 373 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java index cac0f51354..7c56f261ed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java @@ -7,7 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import scala.concurrent.Future; @@ -17,6 +18,9 @@ import scala.concurrent.Future; * implementation. In addition to the usual set of methods it also contains the list of actor * futures. */ -abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { - abstract List> getCohortFutures(); +public abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + protected static final ListenableFuture IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null); + protected static final ListenableFuture IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE); + + abstract List> getCohortFutures(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java index ed3aa85c1f..f1f33bfc93 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java @@ -22,20 +22,20 @@ final class ChainedTransactionProxy extends TransactionProxy { /** * Stores the ready Futures from the previous Tx in the chain. */ - private final List> previousReadyFutures; + private final List> previousReadyFutures; /** * Stores the ready Futures from this transaction when it is readied. */ - private volatile List> readyFutures; + private volatile List> readyFutures; ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { + String transactionChainId, List> previousReadyFutures) { super(actorContext, transactionType, transactionChainId); this.previousReadyFutures = previousReadyFutures; } - List> getReadyFutures() { + List> getReadyFutures() { return readyFutures; } @@ -43,10 +43,11 @@ final class ChainedTransactionProxy extends TransactionProxy { return readyFutures != null; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public AbstractThreePhaseCommitCohort ready() { - final AbstractThreePhaseCommitCohort ret = super.ready(); - readyFutures = ret.getCohortFutures(); + public AbstractThreePhaseCommitCohort ready() { + final AbstractThreePhaseCommitCohort ret = super.ready(); + readyFutures = (List)ret.getCohortFutures(); LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), getTransactionChainId()); return ret; @@ -70,14 +71,14 @@ final class ChainedTransactionProxy extends TransactionProxy { } // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( + Future> combinedFutures = akka.dispatch.Futures.sequence( previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { + OnComplete> onComplete = new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable notUsed) { + public void onComplete(Throwable failure, Iterable notUsed) { if(failure != null) { // A Ready Future failed so fail the returned Promise. returnPromise.failure(failure); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index c3cc499865..8ae79ceb2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -63,7 +63,7 @@ public class DatastoreContext { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; - private boolean writeOnlyTransactionOptimizationsEnabled = false; + private boolean writeOnlyTransactionOptimizationsEnabled = true; public static Set getGlobalDatastoreTypes() { return globalDatastoreTypes; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java index 376b658046..1f5f5bcf79 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java @@ -7,8 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collections; import java.util.List; @@ -18,12 +16,9 @@ import scala.concurrent.Future; * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} * instance given out for empty transactions. */ -final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort { +final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort { static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort(); - private static final ListenableFuture IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null); - private static final ListenableFuture IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE); - private NoOpDOMStoreThreePhaseCommitCohort() { // Hidden to prevent instantiation } @@ -49,7 +44,7 @@ final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitC } @Override - List> getCohortFutures() { + List> getCohortFutures() { return Collections.emptyList(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index 672560bbdd..197cd9fa83 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -36,9 +36,21 @@ final class NoOpTransactionContext extends AbstractTransactionContext { LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier()); } + @Override + public boolean supportsDirectCommit() { + return true; + } + + @Override + public Future directCommit() { + LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure); + operationLimiter.release(); + return akka.dispatch.Futures.failed(failure); + } + @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called", getIdentifier()); + LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure); operationLimiter.release(); return akka.dispatch.Futures.failed(failure); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java index b9445361bb..592e6bc9c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java @@ -8,7 +8,31 @@ package org.opendaylight.controller.cluster.datastore; -public interface OperationCallback { +import java.util.concurrent.atomic.AtomicReference; + +interface OperationCallback { + OperationCallback NO_OP_CALLBACK = new OperationCallback() { + @Override + public void run() { + } + + @Override + public void success() { + } + + @Override + public void failure() { + } + }; + + class Reference extends AtomicReference { + private static final long serialVersionUID = 1L; + + public Reference(OperationCallback initialValue) { + super(initialValue); + } + } + void run(); void success(); void failure(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7110adc625..91e072b076 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -30,7 +30,6 @@ import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; -import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -49,7 +47,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -107,9 +104,6 @@ public class Shard extends RaftActor { private final MessageTracker appendEntriesReplyTracker; - private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( - Serialization.serializedActorPath(getSelf())); - private final DOMTransactionFactory domTransactionFactory; private final ShardTransactionActorFactory transactionActorFactory; @@ -240,7 +234,8 @@ public class Shard extends RaftActor { } else if (BatchedModifications.class.isInstance(message)) { handleBatchedModifications((BatchedModifications)message); } else if (message instanceof ForwardedReadyTransaction) { - handleForwardedReadyTransaction((ForwardedReadyTransaction) message); + commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message, + getSender(), this); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -310,54 +305,22 @@ public class Shard extends RaftActor { } } - private void handleCommitTransaction(final CommitTransaction commit) { - final String transactionID = commit.getTransactionID(); - - LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID); - - // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to - // this transaction. - final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if(cohortEntry == null) { - // We're not the current Tx - the Tx was likely expired b/c it took too long in - // between the canCommit and commit messages. - IllegalStateException ex = new IllegalStateException( - String.format("%s: Cannot commit transaction %s - it is not the current transaction", - persistenceId(), transactionID)); - LOG.error(ex.getMessage()); - shardMBean.incrementFailedTransactionsCount(); - getSender().tell(new akka.actor.Status.Failure(ex), getSelf()); - return; + void continueCommit(final CohortEntry cohortEntry) throws Exception { + // If we do not have any followers and we are not using persistence + // or if cohortEntry has no modifications + // we can apply modification to the state immediately + if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ + applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification()); + } else { + Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), + new ModificationPayload(cohortEntry.getModification())); } + } - // We perform the preCommit phase here atomically with the commit phase. This is an - // optimization to eliminate the overhead of an extra preCommit message. We lose front-end - // coordination of preCommit across shards in case of failure but preCommit should not - // normally fail since we ensure only one concurrent 3-phase commit. - - try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().preCommit().get(); - - // If we do not have any followers and we are not using persistence - // or if cohortEntry has no modifications - // we can apply modification to the state immediately - if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ - applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); - } else { - Shard.this.persistData(getSender(), transactionID, - new ModificationPayload(cohortEntry.getModification())); - } - } catch (Exception e) { - LOG.error("{} An exception occurred while preCommitting transaction {}", - persistenceId(), cohortEntry.getTransactionID(), e); + private void handleCommitTransaction(final CommitTransaction commit) { + if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { shardMBean.incrementFailedTransactionsCount(); - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } - - cohortEntry.updateLastAccessTime(); } private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) { @@ -415,7 +378,7 @@ public class Shard extends RaftActor { private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); - commitCoordinator.handleCanCommit(canCommit, getSender(), self()); + commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } private void handleBatchedModifications(BatchedModifications batched) { @@ -433,12 +396,7 @@ public class Shard extends RaftActor { // if(isLeader()) { try { - boolean ready = commitCoordinator.handleTransactionModifications(batched); - if(ready) { - sender().tell(READY_TRANSACTION_REPLY, self()); - } else { - sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self()); - } + commitCoordinator.handleBatchedModifications(batched, getSender(), this); } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -463,39 +421,6 @@ public class Shard extends RaftActor { } } - private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(), - ready.getTransactionID(), ready.getTxnClientVersion()); - - // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the - // commitCoordinator in preparation for the subsequent three phase commit initiated by - // the front-end. - commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), - (MutableCompositeModification) ready.getModification()); - - // Return our actor path as we'll handle the three phase commit, except if the Tx client - // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version - // node. In that case, the subsequent 3-phase commit messages won't contain the - // transactionId so to maintain backwards compatibility, we create a separate cohort actor - // to provide the compatible behavior. - if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { - ActorRef replyActorPath = getSelf(); - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); - replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); - } - - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), - ready.getTxnClientVersion()); - getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, getSelf()); - } else { - getSender().tell(READY_TRANSACTION_REPLY, getSelf()); - } - } - private void handleAbortTransaction(final AbortTransaction abort) { doAbortTransaction(abort.getTransactionID(), getSender()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index b96e38d76a..4ff9b5fd43 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -21,11 +21,15 @@ import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; @@ -56,8 +60,6 @@ public class ShardCommitCoordinator { private final String name; - private final String shardActorPath; - private final RemovalListener cacheRemovalListener = new RemovalListener() { @Override @@ -71,6 +73,8 @@ public class ShardCommitCoordinator { // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. private CohortDecorator cohortDecorator; + private ReadyTransactionReply readyTransactionReply; + public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { @@ -79,8 +83,6 @@ public class ShardCommitCoordinator { this.name = name; this.transactionFactory = transactionFactory; - shardActorPath = Serialization.serializedActorPath(shardActor); - cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). removalListener(cacheRemovalListener).build(); @@ -93,18 +95,55 @@ public class ShardCommitCoordinator { this.queueCapacity = queueCapacity; } + private ReadyTransactionReply readyTransactionReply(Shard shard) { + if(readyTransactionReply == null) { + readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self())); + } + + return readyTransactionReply; + } + /** * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. - * - * @param transactionID the ID of the transaction - * @param cohort the cohort to participate in the transaction commit - * @param modification the modifications made by the transaction */ - public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - MutableCompositeModification modification) { + public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) { + log.debug("{}: Readying transaction {}, client version {}", name, + ready.getTransactionID(), ready.getTxnClientVersion()); + + CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(), + (MutableCompositeModification) ready.getModification()); + cohortCache.put(ready.getTransactionID(), cohortEntry); + + if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { + // Return our actor path as we'll handle the three phase commit except if the Tx client + // version < Helium-1 version which means the Tx was initiated by a base Helium version node. + // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to + // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior. + ActorRef replyActorPath = shard.self(); + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { + log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name); + replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } - cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification)); + ReadyTransactionReply readyTransactionReply = + new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), + ready.getTxnClientVersion()); + sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, shard.self()); + } else { + if(ready.isDoImmediateCommit()) { + cohortEntry.setDoImmediateCommit(true); + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + handleCanCommit(cohortEntry); + } else { + // The caller does not want immediate commit - the 3-phase commit will be coordinated by the + // front-end so send back a ReadyTransactionReply with our actor path. + sender.tell(readyTransactionReply(shard), shard.self()); + } + } } /** @@ -118,7 +157,7 @@ public class ShardCommitCoordinator { * * @throws ExecutionException if an error occurs loading the cache */ - public boolean handleTransactionModifications(BatchedModifications batched) + boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) throws ExecutionException { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { @@ -142,43 +181,30 @@ public class ShardCommitCoordinator { batched.getTransactionID(), batched.getVersion()); } - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady()); + + if(batched.isDoCommitOnReady()) { + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + handleCanCommit(cohortEntry); + } else { + sender.tell(readyTransactionReply(shard), shard.self()); + } + } else { + sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self()); } return batched.isReady(); } - /** - * This method handles the canCommit phase for a transaction. - * - * @param canCommit the CanCommitTransaction message - * @param sender the actor that sent the message - * @param shard the transaction's shard actor - */ - public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, - final ActorRef shard) { - String transactionID = canCommit.getTransactionID(); + private void handleCanCommit(CohortEntry cohortEntry) { + String transactionID = cohortEntry.getTransactionID(); + if(log.isDebugEnabled()) { log.debug("{}: Processing canCommit for transaction {} for shard {}", - name, transactionID, shard.path()); - } - - // Lookup the cohort entry that was cached previously (or should have been) by - // transactionReady (via the ForwardedReadyTransaction message). - final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID); - if(cohortEntry == null) { - // Either canCommit was invoked before ready(shouldn't happen) or a long time passed - // between canCommit and ready and the entry was expired from the cache. - IllegalStateException ex = new IllegalStateException( - String.format("%s: No cohort entry found for transaction %s", name, transactionID)); - log.error(ex.getMessage()); - sender.tell(new Status.Failure(ex), shard); - return; + name, transactionID, cohortEntry.getShard().self().path()); } - cohortEntry.setCanCommitSender(sender); - cohortEntry.setShard(shard); - if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. @@ -195,7 +221,7 @@ public class ShardCommitCoordinator { " capacity %d has been reached.", name, transactionID, queueCapacity)); log.error(ex.getMessage()); - sender.tell(new Status.Failure(ex), shard); + cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self()); } } else { // No Tx commit currently in progress - make this the current entry and proceed with @@ -207,29 +233,119 @@ public class ShardCommitCoordinator { } } + /** + * This method handles the canCommit phase for a transaction. + * + * @param canCommit the CanCommitTransaction message + * @param sender the actor that sent the message + * @param shard the transaction's shard actor + */ + public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { + // Lookup the cohort entry that was cached previously (or should have been) by + // transactionReady (via the ForwardedReadyTransaction message). + final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID); + if(cohortEntry == null) { + // Either canCommit was invoked before ready(shouldn't happen) or a long time passed + // between canCommit and ready and the entry was expired from the cache. + IllegalStateException ex = new IllegalStateException( + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); + sender.tell(new Status.Failure(ex), shard.self()); + return; + } + + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + + handleCanCommit(cohortEntry); + } + private void doCanCommit(final CohortEntry cohortEntry) { + boolean canCommit = false; try { // We block on the future here so we don't have to worry about possibly accessing our // state on a different thread outside of our dispatcher. Also, the data store // currently uses a same thread executor anyway. - Boolean canCommit = cohortEntry.getCohort().canCommit().get(); + canCommit = cohortEntry.getCohort().canCommit().get(); + + if(cohortEntry.isDoImmediateCommit()) { + if(canCommit) { + doCommit(cohortEntry); + } else { + cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException( + "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self()); + } + } else { + cohortEntry.getReplySender().tell( + canCommit ? CanCommitTransactionReply.YES.toSerializable() : + CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self()); + } + } catch (Exception e) { + log.debug("{}: An exception occurred during canCommit: {}", name, e); - cohortEntry.getCanCommitSender().tell( - canCommit ? CanCommitTransactionReply.YES.toSerializable() : - CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard()); + Throwable failure = e; + if(e instanceof ExecutionException) { + failure = e.getCause(); + } + cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self()); + } finally { if(!canCommit) { - // Remove the entry from the cache now since the Tx will be aborted. - removeCohortEntry(cohortEntry.getTransactionID()); + // Remove the entry from the cache now. + currentTransactionComplete(cohortEntry.getTransactionID(), true); } - } catch (InterruptedException | ExecutionException e) { - log.debug("{}: An exception occurred during canCommit: {}", name, e); + } + } + + private boolean doCommit(CohortEntry cohortEntry) { + log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID()); - // Remove the entry from the cache now since the Tx will be aborted. - removeCohortEntry(cohortEntry.getTransactionID()); - cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard()); + boolean success = false; + + // We perform the preCommit phase here atomically with the commit phase. This is an + // optimization to eliminate the overhead of an extra preCommit message. We lose front-end + // coordination of preCommit across shards in case of failure but preCommit should not + // normally fail since we ensure only one concurrent 3-phase commit. + + try { + // We block on the future here so we don't have to worry about possibly accessing our + // state on a different thread outside of our dispatcher. Also, the data store + // currently uses a same thread executor anyway. + cohortEntry.getCohort().preCommit().get(); + + cohortEntry.getShard().continueCommit(cohortEntry); + + cohortEntry.updateLastAccessTime(); + + success = true; + } catch (Exception e) { + log.error("{} An exception occurred while preCommitting transaction {}", + name, cohortEntry.getTransactionID(), e); + cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self()); + + currentTransactionComplete(cohortEntry.getTransactionID(), true); + } + + return success; + } + + boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) { + // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to + // this transaction. + final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); + if(cohortEntry == null) { + // We're not the current Tx - the Tx was likely expired b/c it took too long in + // between the canCommit and commit messages. + IllegalStateException ex = new IllegalStateException( + String.format("%s: Cannot commit transaction %s - it is not the current transaction", + name, transactionID)); + log.error(ex.getMessage()); + sender.tell(new akka.actor.Status.Failure(ex), shard.self()); + return false; } + + return doCommit(cohortEntry); } /** @@ -302,9 +418,10 @@ public class ShardCommitCoordinator { private DOMStoreThreePhaseCommitCohort cohort; private final MutableCompositeModification compositeModification; private final DOMStoreWriteTransaction transaction; - private ActorRef canCommitSender; - private ActorRef shard; + private ActorRef replySender; + private Shard shard; private long lastAccessTime; + private boolean doImmediateCommit; CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { this.compositeModification = new MutableCompositeModification(); @@ -347,9 +464,11 @@ public class ShardCommitCoordinator { } } - void ready(CohortDecorator cohortDecorator) { + void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { Preconditions.checkState(cohort == null, "cohort was already set"); + setDoImmediateCommit(doImmediateCommit); + cohort = transaction.ready(); if(cohortDecorator != null) { @@ -358,19 +477,27 @@ public class ShardCommitCoordinator { } } - ActorRef getCanCommitSender() { - return canCommitSender; + boolean isDoImmediateCommit() { + return doImmediateCommit; + } + + void setDoImmediateCommit(boolean doImmediateCommit) { + this.doImmediateCommit = doImmediateCommit; + } + + ActorRef getReplySender() { + return replySender; } - void setCanCommitSender(ActorRef canCommitSender) { - this.canCommitSender = canCommitSender; + void setReplySender(ActorRef replySender) { + this.replySender = replySender; } - ActorRef getShard() { + Shard getShard() { return shard; } - void setShard(ActorRef shard) { + void setShard(Shard shard) { this.shard = shard; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 1d5b1d8e1b..424ab2052c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -61,9 +61,9 @@ public class ShardWriteTransaction extends ShardTransaction { if (message instanceof BatchedModifications) { batchedModifications((BatchedModifications)message); } else if (message instanceof ReadyTransaction) { - readyTransaction(transaction, !SERIALIZED_REPLY); + readyTransaction(transaction, !SERIALIZED_REPLY, false); } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { - readyTransaction(transaction, SERIALIZED_REPLY); + readyTransaction(transaction, SERIALIZED_REPLY, false); } else if(WriteData.isSerializedType(message)) { writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY); @@ -100,7 +100,7 @@ public class ShardWriteTransaction extends ShardTransaction { totalBatchedModificationsReceived, batched.getTotalMessagesSent())); } - readyTransaction(transaction, false); + readyTransaction(transaction, false, batched.isDoCommitOnReady()); } else { getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); } @@ -162,7 +162,8 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) { + private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized, + boolean doImmediateCommit) { String transactionID = getTransactionID(); LOG.debug("readyTransaction : {}", transactionID); @@ -170,7 +171,7 @@ public class ShardWriteTransaction extends ShardTransaction { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), - cohort, compositeModification, returnSerialized), getContext()); + cohort, compositeModification, returnSerialized, doImmediateCommit), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. getSelf().tell(PoisonPill.getInstance(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java new file mode 100644 index 0000000000..e340859321 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Arrays; +import java.util.List; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit + * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the + * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a + * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions). + * + * @author Thomas Pantelis + */ +class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort { + private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class); + + private final ActorContext actorContext; + private final Future cohortFuture; + private final String transactionId; + private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + private final OperationCallback.Reference operationCallbackRef; + + SingleCommitCohortProxy(ActorContext actorContext, Future cohortFuture, String transactionId, + OperationCallback.Reference operationCallbackRef) { + this.actorContext = actorContext; + this.cohortFuture = cohortFuture; + this.transactionId = transactionId; + this.operationCallbackRef = operationCallbackRef; + } + + @Override + public ListenableFuture canCommit() { + LOG.debug("Tx {} canCommit", transactionId); + + final SettableFuture returnFuture = SettableFuture.create(); + + cohortFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object cohortResponse) { + if(failure != null) { + operationCallbackRef.get().failure(); + returnFuture.setException(failure); + return; + } + + operationCallbackRef.get().success(); + + if(cohortResponse instanceof ActorSelection) { + handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture); + return; + } + + LOG.debug("Tx {} successfully completed direct commit", transactionId); + + // The Future was the result of a direct commit to the shard, essentially eliding the + // front-end 3PC coordination. We don't really care about the specific Future + // response object, only that it completed successfully. At this point the Tx is complete + // so return true. The subsequent preCommit and commit phases will be no-ops, ie return + // immediate success, to complete the 3PC for the front-end. + returnFuture.set(Boolean.TRUE); + } + }, actorContext.getClientDispatcher()); + + return returnFuture; + } + + @Override + public ListenableFuture preCommit() { + return delegateCohort.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegateCohort.abort(); + } + + @Override + public ListenableFuture commit() { + return delegateCohort.commit(); + } + + @Override + List> getCohortFutures() { + return Arrays.asList(cohortFuture); + } + + private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture returnFuture) { + // Handle backwards compatibility. An ActorSelection response would be returned from a + // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy. + delegateCohort = new ThreePhaseCommitCohortProxy(actorContext, + Arrays.asList(Futures.successful(actorSelection)), transactionId); + com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback() { + @Override + public void onSuccess(Boolean canCommit) { + returnFuture.set(canCommit); + } + + @Override + public void onFailure(Throwable t) { + returnFuture.setException(t); + } + }); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 3a2bcf2336..57749a1a73 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -32,30 +32,14 @@ import scala.runtime.AbstractFunction1; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies */ -public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort { +public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort { private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class); - private static final ListenableFuture IMMEDIATE_SUCCESS = - com.google.common.util.concurrent.Futures.immediateFuture(null); - private final ActorContext actorContext; private final List> cohortFutures; private volatile List cohorts; private final String transactionId; - private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() { - @Override - public void run() { - } - - @Override - public void success() { - } - - @Override - public void failure() { - } - }; public ThreePhaseCommitCohortProxy(ActorContext actorContext, List> cohortFutures, String transactionId) { @@ -190,7 +174,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort public ListenableFuture preCommit() { // We don't need to do anything here - preCommit is done atomically with the commit phase // by the shard. - return IMMEDIATE_SUCCESS; + return IMMEDIATE_VOID_SUCCESS; } @Override @@ -207,7 +191,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort @Override public ListenableFuture commit() { - OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK : + OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK : new TransactionRateLimitingCallback(actorContext); return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), @@ -216,7 +200,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort private ListenableFuture voidOperation(final String operationName, final Object message, final Class expectedResponseClass, final boolean propagateException) { - return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK); + return voidOperation(operationName, message, expectedResponseClass, propagateException, + OperationCallback.NO_OP_CALLBACK); } private ListenableFuture voidOperation(final String operationName, final Object message, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 11066edd54..5531b5f540 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; @@ -30,7 +29,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private interface State { boolean isReady(); - List> getPreviousReadyFutures(); + List> getPreviousReadyFutures(); } private static class Allocated implements State { @@ -46,14 +45,14 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return Collections.emptyList(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index bc6e5f229f..4eea785964 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -32,4 +32,8 @@ interface TransactionContext { void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture); void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture); + + boolean supportsDirectCommit(); + + Future directCommit(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c722918c5c..a9deeaaeba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -89,13 +89,27 @@ public class TransactionContextImpl extends AbstractTransactionContext { actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable()); } + @Override + public boolean supportsDirectCommit() { + return true; + } + + @Override + public Future directCommit() { + LOG.debug("Tx {} directCommit called", getIdentifier()); + + // Send the remaining batched modifications, if any, with the ready flag set. + + return sendBatchedModifications(true, true); + } + @Override public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. - Future lastModificationsFuture = sendBatchedModifications(true); + Future lastModificationsFuture = sendBatchedModifications(true, false); return transformReadyReply(lastModificationsFuture); } @@ -145,10 +159,10 @@ public class TransactionContextImpl extends AbstractTransactionContext { } protected Future sendBatchedModifications() { - return sendBatchedModifications(false); + return sendBatchedModifications(false, false); } - protected Future sendBatchedModifications(boolean ready) { + protected Future sendBatchedModifications(boolean ready, boolean doCommitOnReady) { Future sent = null; if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { if(batchedModifications == null) { @@ -161,6 +175,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { } batchedModifications.setReady(ready); + batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); sent = executeOperationAsync(batchedModifications); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 71799c92d4..f12fdd99ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -354,7 +354,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); @@ -371,10 +371,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort() { + TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), + txFutureCallback.getShardName(), transactionChainId); + + final OperationCallback.Reference operationCallbackRef = + new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + } + }); + future = promise.future(); + } + + return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + } + + private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, + OperationCallback.Reference operationCallbackRef) { + if(transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); + } else { + return transactionContext.readyTransaction(); + } + } + + private AbstractThreePhaseCommitCohort createMultiCommitCohort() { List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), + LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), txFutureCallback.getShardName(), transactionChainId); final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); @@ -396,8 +441,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction directCommit() { + throw new UnsupportedOperationException("directCommit is not supported for " + getClass()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index 86f96f57d0..f95473f8a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement private static final long serialVersionUID = 1L; private boolean ready; + private boolean doCommitOnReady; private int totalMessagesSent; private String transactionID; private String transactionChainID; @@ -43,6 +44,14 @@ public class BatchedModifications extends MutableCompositeModification implement this.ready = ready; } + public boolean isDoCommitOnReady() { + return doCommitOnReady; + } + + public void setDoCommitOnReady(boolean doCommitOnReady) { + this.doCommitOnReady = doCommitOnReady; + } + public int getTotalMessagesSent() { return totalMessagesSent; } @@ -66,6 +75,7 @@ public class BatchedModifications extends MutableCompositeModification implement transactionChainID = in.readUTF(); ready = in.readBoolean(); totalMessagesSent = in.readInt(); + doCommitOnReady = in.readBoolean(); } @Override @@ -75,6 +85,7 @@ public class BatchedModifications extends MutableCompositeModification implement out.writeUTF(transactionChainID); out.writeBoolean(ready); out.writeInt(totalMessagesSent); + out.writeBoolean(doCommitOnReady); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 0f87243059..cdd7859a30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -20,16 +20,18 @@ public class ForwardedReadyTransaction { private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; private final boolean returnSerialized; + private final boolean doImmediateCommit; private final short txnClientVersion; public ForwardedReadyTransaction(String transactionID, short txnClientVersion, DOMStoreThreePhaseCommitCohort cohort, Modification modification, - boolean returnSerialized) { + boolean returnSerialized, boolean doImmediateCommit) { this.transactionID = transactionID; this.cohort = cohort; this.modification = modification; this.returnSerialized = returnSerialized; this.txnClientVersion = txnClientVersion; + this.doImmediateCommit = doImmediateCommit; } public String getTransactionID() { @@ -51,4 +53,8 @@ public class ForwardedReadyTransaction { public short getTxnClientVersion() { return txnClientVersion; } + + public boolean isDoImmediateCommit() { + return doImmediateCommit; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 34f0164504..f6a103ffb8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -222,7 +222,11 @@ public abstract class AbstractShardTest extends AbstractActorTest{ doAnswer(new Answer>() { @Override public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - return actual.preCommit(); + if(preCommit != null) { + return preCommit.apply(actual); + } else { + return actual.preCommit(); + } } }).when(cohort).preCommit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 6a1e12a96b..be96560814 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; @@ -24,13 +23,19 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import akka.testkit.JavaTestKit; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.Before; @@ -45,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.TransactionProxy.Transactio import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; @@ -55,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.modification.AbstractModifi import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; @@ -81,7 +88,24 @@ public abstract class AbstractTransactionProxyTest { private static ActorSystem system; - private final Configuration configuration = new MockConfiguration(); + private final Configuration configuration = new MockConfiguration() { + @Override + public Map getModuleNameToShardStrategyMap() { + return ImmutableMap.builder().put( + "junk", new ShardStrategy() { + @Override + public String findShard(YangInstanceIdentifier path) { + return "junk"; + } + }).build(); + } + + @Override + public Optional getModuleNameFromNameSpace(String nameSpace) { + return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ? + Optional.of("junk") : Optional.absent(); + } + }; @Mock protected ActorContext mockActorContext; @@ -126,6 +150,9 @@ public abstract class AbstractTransactionProxyTest { doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + Timer timer = new MetricRegistry().timer("test"); + doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class)); + ShardStrategyFactory.setConfiguration(configuration); } @@ -246,8 +273,13 @@ public abstract class AbstractTransactionProxyTest { } protected void expectBatchedModificationsReady(ActorRef actorRef) { - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + expectBatchedModificationsReady(actorRef, false); + } + + protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) { + doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) : + readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } protected void expectBatchedModifications(int count) { @@ -274,6 +306,10 @@ public abstract class AbstractTransactionProxyTest { } protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { + return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD); + } + + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard actor {}", actorRef); @@ -281,7 +317,7 @@ public abstract class AbstractTransactionProxyTest { when(mockActorContext).actorSelection(actorRef.path().toString()); doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); @@ -291,8 +327,8 @@ public abstract class AbstractTransactionProxyTest { } protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion) { - ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + TransactionType type, int transactionVersion, String shardName) { + ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName); return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion, memberName, shardActorRef); @@ -321,9 +357,15 @@ public abstract class AbstractTransactionProxyTest { } protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { - return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); + return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION, + DefaultShardStrategy.DEFAULT_SHARD); } + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, + String shardName) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION, + shardName); + } protected void propagateReadFailedExceptionCause(CheckedFuture future) throws Throwable { @@ -362,14 +404,20 @@ public abstract class AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected); + verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected); } protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) { + verifyBatchedModifications(message, expIsReady, false, expected); + } + + protected void verifyBatchedModifications(Object message, boolean expIsReady, boolean expIsDoCommitOnReady, + Modification... expected) { assertEquals("Message type", BatchedModifications.class, message.getClass()); BatchedModifications batchedModifications = (BatchedModifications)message; assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); assertEquals("isReady", expIsReady, batchedModifications.isReady()); + assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady()); for(int i = 0; i < batchedModifications.getModifications().size(); i++) { Modification actual = batchedModifications.getModifications().get(i); assertEquals("Modification type", expected[i].getClass(), actual.getClass()); @@ -382,27 +430,44 @@ public abstract class AbstractTransactionProxyTest { } } - protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, + protected void verifyCohortFutures(AbstractThreePhaseCommitCohort proxy, Object... expReplies) throws Exception { assertEquals("getReadyOperationFutures size", expReplies.length, proxy.getCohortFutures().size()); - int i = 0; - for( Future future: proxy.getCohortFutures()) { + List futureResults = new ArrayList<>(); + for( Future future: proxy.getCohortFutures()) { assertNotNull("Ready operation Future is null", future); + try { + futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS))); + } catch(Exception e) { + futureResults.add(e); + } + } - Object expReply = expReplies[i++]; - if(expReply instanceof ActorSelection) { - ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - assertEquals("Cohort actor path", expReply, actual); - } else { - try { - Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - fail("Expected exception from ready operation Future"); - } catch(Exception e) { - assertTrue(String.format("Expected exception type %s. Actual %s", - expReply, e.getClass()), ((Class)expReply).isInstance(e)); + for(int i = 0; i < expReplies.length; i++) { + Object expReply = expReplies[i]; + boolean found = false; + Iterator iter = futureResults.iterator(); + while(iter.hasNext()) { + Object actual = iter.next(); + if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(expReply) && + CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(actual)) { + found = true; + } else if(expReply instanceof ActorSelection && Objects.equal(expReply, actual)) { + found = true; + } else if(expReply instanceof Class && ((Class)expReply).isInstance(actual)) { + found = true; } + + if(found) { + iter.remove(); + break; + } + } + + if(!found) { + fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults)); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index a8384d8758..94cc6e5e59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -147,9 +147,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, + final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem()) {{ - String testName = "testTransactionWritesWithShardNotInitiallyReady"; String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't @@ -241,12 +241,12 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - testTransactionWritesWithShardNotInitiallyReady(true); + testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true); } @Test public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { - testTransactionWritesWithShardNotInitiallyReady(false); + testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false); } @Test @@ -873,7 +873,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS); assertEquals("canCommit", true, canCommit); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e3b82df174..72f672794a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -8,6 +8,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -469,7 +470,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -484,11 +485,11 @@ public class ShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -595,18 +596,7 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + verifyOuterListEntry(shard, 1); verifyLastApplied(shard, 2); @@ -615,25 +605,25 @@ public class ShardTest extends AbstractShardTest { } private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, - NormalizedNode data, boolean ready) { - return newBatchedModifications(transactionID, null, path, data, ready); + NormalizedNode data, boolean ready, boolean doCommitOnReady) { + return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, - YangInstanceIdentifier path, NormalizedNode data, boolean ready) { + YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); batched.setReady(ready); + batched.setDoCommitOnReady(doCommitOnReady); return batched; } - @SuppressWarnings("unchecked") @Test - public void testMultipleBatchedModifications() throws Throwable { + public void testBatchedModificationsWithNoCommitOnReady() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testMultipleBatchedModifications"); + "testBatchedModificationsWithNoCommitOnReady"); waitUntilLeader(shard); @@ -657,18 +647,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -690,23 +680,85 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + verifyOuterListEntry(shard, 1); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } + @Test + public void testBatchedModificationsWithCommitOnReady() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsWithCommitOnReady"); + + waitUntilLeader(shard); + + final String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); + + final AtomicReference mockCohort = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); + } + + return mockCohort.get(); + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send a BatchedModifications to start a transaction. + + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Send a couple more BatchedModifications. + + shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + InOrder inOrder = inOrder(mockCohort.get()); + inOrder.verify(mockCohort.get()).canCommit(); + inOrder.verify(mockCohort.get()).preCommit(); + inOrder.verify(mockCohort.get()).commit(); + + // Verify data in the data store. + + verifyOuterListEntry(shard, 1); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @SuppressWarnings("unchecked") + private void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + Object entry = ((Iterable)outerList.getValue()).iterator().next(); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue()); + } + @Test public void testBatchedModificationsOnTransactionChain() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -727,7 +779,7 @@ public class ShardTest extends AbstractShardTest { ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, - containerNode, true), getRef()); + containerNode, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. @@ -800,6 +852,45 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testForwardedReadyTransactionWithImmediateCommit"); + + waitUntilLeader(shard); + + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + TestModel.TEST_PATH, containerNode, modification); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); + assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCommitWithPersistenceDisabled() throws Throwable { dataStoreContextBuilder.persistent(false); @@ -826,7 +917,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -878,7 +969,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -933,7 +1024,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -973,7 +1064,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 simulated transactions with mock cohorts. The first one fails in the + // Setup 2 simulated transactions with mock cohorts. The first one fails in the // commit phase. String transactionID1 = "tx1"; @@ -995,11 +1086,11 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1052,37 +1143,66 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); + final Timeout timeout = new Timeout(duration); // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - // Send the CanCommitTransaction message. + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + // Send the CanCommitTransaction message for the first Tx. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the CommitTransaction message. This should send back an error - // for preCommit failure. + // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and + // processed after the first Tx completes. - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + Future canCommitFuture = Patterns.ask(shard, + new CanCommitTransaction(transactionID2).toSerializable(), timeout); + + // Send the CommitTransaction message for the first Tx. This should send back an error + // and trigger the 2nd Tx to proceed. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); - InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); + // Wait for the 2nd Tx to complete the canCommit phase. + + final CountDownLatch latch = new CountDownLatch(1); + canCommitFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable t, final Object resp) { + latch.countDown(); + } + }, getSystem().dispatcher()); + + assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS)); + + InOrder inOrder = inOrder(cohort1, cohort2); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort2).canCommit(); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; @@ -1099,7 +1219,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - String transactionID = "tx1"; + String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); @@ -1107,15 +1227,165 @@ public class ShardTest extends AbstractShardTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", true, reply.getCanCommit()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testCanCommitPhaseFalseResponse() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCanCommitPhaseFalseResponse"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID1 = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", false, reply.getCanCommit()); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", true, reply.getCanCommit()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testImmediateCommitWithCanCommitPhaseFailure"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID1 = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, akka.actor.Status.Failure.class); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testImmediateCommitWithCanCommitPhaseFalseResponse"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, akka.actor.Status.Failure.class); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1159,7 +1429,7 @@ public class ShardTest extends AbstractShardTest { modification, preCommit); shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1224,11 +1494,11 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1241,6 +1511,11 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + // Try to commit the 1st Tx - should fail as it's not the current Tx. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, akka.actor.Status.Failure.class); + // Commit the 2nd Tx. shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1288,15 +1563,15 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. @@ -1360,11 +1635,11 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index c9335f378a..b22001a4da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -26,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; @@ -412,11 +413,11 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveBatchedModificationsReady() throws Exception { + public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), - "testOnReceiveBatchedModificationsReady"); + "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit"); JavaTestKit watcher = new JavaTestKit(getSystem()); watcher.watch(transaction); @@ -443,6 +444,33 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } + @Test + public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReadyWithImmediateCommit"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(writePath, writeData)); + batched.setReady(true); + batched.setDoCommitOnReady(true); + batched.setTotalMessagesSent(1); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + @Test(expected=TestException.class) public void testOnReceiveBatchedModificationsFailure() throws Throwable { new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index cc9692bfd9..93c6ddbe73 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -326,7 +327,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteAfterAsyncRead() throws Throwable { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem()); + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD); Promise createTxPromise = akka.dispatch.Futures.promise(); doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( @@ -460,7 +461,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -470,16 +471,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, + verifyBatchedModifications(batchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent()); @@ -492,7 +491,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -500,16 +499,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true); + verifyBatchedModifications(batchedModifications.get(0), true, true); + } + + @Test + public void testReadyWithMultipleShardWrites() throws Exception { + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); + + expectBatchedModificationsReady(actorRef1); + expectBatchedModificationsReady(actorRef2); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1), + actorSelection(actorRef2)); } @Test @@ -520,7 +539,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -528,16 +547,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, + verifyBatchedModifications(batchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), @@ -551,7 +568,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -559,11 +576,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); @@ -571,7 +586,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - verifyBatchedModifications(batchedModifications.get(1), true); + verifyBatchedModifications(batchedModifications.get(1), true, true); verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); @@ -593,11 +608,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, TestException.class); + verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class); } private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { @@ -615,11 +628,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, toThrow.getClass()); + verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass()); } @Test @@ -640,27 +651,26 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithInvalidReplyMessageType() throws Exception { dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - //expectBatchedModifications(actorRef, 1); + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class)); + + expectBatchedModificationsReady(actorRef2); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyCohortFutures(proxy, IllegalArgumentException.class); + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2), + IllegalArgumentException.class); } @Test @@ -746,7 +756,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -755,11 +765,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); } private static interface TransactionProxyOperation { @@ -1224,8 +1232,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3), - new DeleteModification(deletePath2)); + verifyBatchedModifications(batchedModifications.get(2), true, true, + new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 9e1557ae3c..caabb32d71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -247,7 +247,7 @@ public class PreLithiumShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -262,11 +262,11 @@ public class PreLithiumShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, - cohort3, modification3, true), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 4cf8b67ddb..ca342b960a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -18,16 +19,22 @@ import static org.opendaylight.controller.cluster.datastore.TransactionProxy.Tra import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.dispatch.Futures; +import akka.util.Timeout; import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy; import org.opendaylight.controller.cluster.datastore.TransactionProxy; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; @@ -36,7 +43,9 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -95,12 +104,37 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } + private CanCommitTransaction eqCanCommitTransaction(final String transactionID) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) && + CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); + } + }; + + return argThat(matcher); + } + + private CommitTransaction eqCommitTransaction(final String transactionID) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) && + CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); + } + }; + + return argThat(matcher); + } + private Future readySerializedTxReply(String path, short version) { return Futures.successful(new ReadyTransactionReply(path, version).toSerializable()); } private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version, + DefaultShardStrategy.DEFAULT_SHARD); NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -136,13 +170,24 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest transactionProxy.delete(TestModel.TEST_PATH); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + AbstractThreePhaseCommitCohort proxy = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class)); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class)); + + Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit.booleanValue()); + + proxy.preCommit().get(3, TimeUnit.SECONDS); + + proxy.commit().get(3, TimeUnit.SECONDS); return actorRef; } @@ -169,7 +214,8 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest // creating transaction actors for write-only Tx's. public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { short version = DataStoreVersions.HELIUM_2_VERSION; - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version, + DefaultShardStrategy.DEFAULT_SHARD); NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); -- 2.36.6