From 28b2fd303b8e8bc757de6ead454ae06469113b34 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 19 Jan 2016 16:50:51 -0500 Subject: [PATCH] Remove deprecated PreLithium Tx context classes and related code Change-Id: I023f488f58096eef9173213d75befbe3cfc78da9 Signed-off-by: Tom Pantelis --- .../RemoteTransactionContextSupport.java | 15 +- .../datastore/ShardCommitCoordinator.java | 36 +-- .../datastore/SingleCommitCohortProxy.java | 26 -- ...wardsCompatibleThreePhaseCommitCohort.java | 87 ------ .../PreLithiumTransactionContextImpl.java | 112 -------- ...PreLithiumTransactionReadyReplyMapper.java | 42 --- .../cluster/datastore/utils/ActorContext.java | 24 -- .../AbstractTransactionProxyTest.java | 2 +- .../datastore/ShardTransactionTest.java | 31 --- .../datastore/compat/PreLithiumShardTest.java | 209 --------------- .../PreLithiumTransactionProxyTest.java | 247 ------------------ ...ctionHeliumBackwardsCompatibilityTest.java | 191 -------------- .../datastore/utils/ActorContextTest.java | 47 ---- 13 files changed, 13 insertions(+), 1056 deletions(-) delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 88c797a990..4ce0767d1d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -14,7 +14,6 @@ import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; @@ -101,7 +100,7 @@ final class RemoteTransactionContextSupport { void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) { this.primaryShard = primaryShard; - if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION && + if (getTransactionType() == TransactionType.WRITE_ONLY && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); @@ -249,15 +248,9 @@ final class RemoteTransactionContextSupport { // TxActor is always created where the leader of the shard is. // Check if TxActor is created in the same node boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath); - final TransactionContext ret; - - if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - ret = new PreLithiumTransactionContextImpl(transactionContextWrapper.getIdentifier(), transactionPath, transactionActor, - getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextWrapper.getLimiter()); - } else { - ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), transactionActor, getActorContext(), - isTxActorLocal, remoteTransactionVersion, transactionContextWrapper.getLimiter()); - } + final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), + transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion, + transactionContextWrapper.getLimiter()); if(parent.getType() == TransactionType.READ_ONLY) { TransactionContextCleanup.track(this, ret); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 57c5b1de11..38bf34fd3b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; @@ -143,34 +142,15 @@ class ShardCommitCoordinator { return; } - if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { - // Return our actor path as we'll handle the three phase commit except if the Tx client - // version < Helium-1 version which means the Tx was initiated by a base Helium version node. - // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to - // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior. - ActorRef replyActorPath = shard.self(); - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name); - replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); - } - - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), - ready.getTxnClientVersion()); - sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, shard.self()); + if(ready.isDoImmediateCommit()) { + cohortEntry.setDoImmediateCommit(true); + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + handleCanCommit(cohortEntry); } else { - if(ready.isDoImmediateCommit()) { - cohortEntry.setDoImmediateCommit(true); - cohortEntry.setReplySender(sender); - cohortEntry.setShard(shard); - handleCanCommit(cohortEntry); - } else { - // The caller does not want immediate commit - the 3-phase commit will be coordinated by the - // front-end so send back a ReadyTransactionReply with our actor path. - sender.tell(readyTransactionReply(shard), shard.self()); - } + // The caller does not want immediate commit - the 3-phase commit will be coordinated by the + // front-end so send back a ReadyTransactionReply with our actor path. + sender.tell(readyTransactionReply(shard), shard.self()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java index e340859321..0823c902ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java @@ -7,10 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; -import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Arrays; @@ -63,11 +60,6 @@ class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort { operationCallbackRef.get().success(); - if(cohortResponse instanceof ActorSelection) { - handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture); - return; - } - LOG.debug("Tx {} successfully completed direct commit", transactionId); // The Future was the result of a direct commit to the shard, essentially eliding the @@ -101,22 +93,4 @@ class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort { List> getCohortFutures() { return Arrays.asList(cohortFuture); } - - private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture returnFuture) { - // Handle backwards compatibility. An ActorSelection response would be returned from a - // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy. - delegateCohort = new ThreePhaseCommitCohortProxy(actorContext, - Arrays.asList(Futures.successful(actorSelection)), transactionId); - com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback() { - @Override - public void onSuccess(Boolean canCommit) { - returnFuture.set(canCommit); - } - - @Override - public void onFailure(Throwable t) { - returnFuture.setException(t); - } - }); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java deleted file mode 100644 index f05ef91fc5..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.compat; - -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.japi.Creator; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit - * messages don't contain the transactionId. This actor just forwards a new message containing the - * transactionId to the parent Shard. - * - * @author Thomas Pantelis - */ -public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class); - - private final String transactionId; - - private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) { - this.transactionId = transactionId; - } - - @Override - public void handleReceive(Object message) throws Exception { - if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { - LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction"); - - getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(), - getContext()); - } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { - LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction"); - - // The Shard doesn't need the PreCommitTransaction message so just return the reply here. - getSender().tell(new PreCommitTransactionReply().toSerializable(), self()); - } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { - LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction"); - - getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(), - getContext()); - - // We're done now - we can self-destruct - self().tell(PoisonPill.getInstance(), self()); - } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { - LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction"); - - getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(), - getContext()); - self().tell(PoisonPill.getInstance(), self()); - } - } - - public static Props props(String transactionId) { - return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId)); - } - - private static class BackwardsCompatibleThreePhaseCommitCohortCreator - implements Creator { - private static final long serialVersionUID = 1L; - - private final String transactionId; - - BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) { - this.transactionId = transactionId; - } - - @Override - public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception { - return new BackwardsCompatibleThreePhaseCommitCohort(transactionId); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java deleted file mode 100644 index 2634adaf4c..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.compat; - -import akka.actor.ActorSelection; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.OperationLimiter; -import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; - -/** - * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't - * support the BatchedModifications message. - * - * @author Thomas Pantelis - */ -@Deprecated -public class PreLithiumTransactionContextImpl extends RemoteTransactionContext { - private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class); - - private final String transactionPath; - - public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor, - ActorContext actorContext, boolean isTxActorLocal, - short remoteTransactionVersion, OperationLimiter limiter) { - super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter); - this.transactionPath = transactionPath; - } - - @Override - public void executeModification(AbstractModification modification) { - final short remoteTransactionVersion = getRemoteTransactionVersion(); - final YangInstanceIdentifier path = modification.getPath(); - VersionedExternalizableMessage msg = null; - - if(modification instanceof DeleteModification) { - msg = new DeleteData(path, remoteTransactionVersion); - } else if(modification instanceof WriteModification) { - final NormalizedNode data = ((WriteModification) modification).getData(); - - // be sure to check for Merge before Write, since Merge is a subclass of Write - if(modification instanceof MergeModification) { - msg = new MergeData(path, data, remoteTransactionVersion); - } else { - msg = new WriteData(path, data, remoteTransactionVersion); - } - } else { - LOG.error("Invalid modification type " + modification.getClass().getName()); - } - - if(msg != null) { - executeOperationAsync(msg); - } - } - - @Override - public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called", getIdentifier()); - - // Send the ReadyTransaction message to the Tx actor. - - Future lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); - - return transformReadyReply(lastReplyFuture); - } - - @Override - protected Future transformReadyReply(final Future readyReplyFuture) { - // In base Helium we used to return the local path of the actor which represented - // a remote ThreePhaseCommitCohort. The local path would then be converted to - // a remote path using this resolvePath method. To maintain compatibility with - // a Helium node we need to continue to do this conversion. - // At some point in the future when upgrades from Helium are not supported - // we could remove this code to resolvePath and just use the cohortPath as the - // resolved cohortPath - if (getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { - return PreLithiumTransactionReadyReplyMapper.transform(readyReplyFuture, getActorContext(), getIdentifier(), transactionPath); - } else { - return super.transformReadyReply(readyReplyFuture); - } - } - - @Override - public boolean supportsDirectCommit() { - return false; - } - - @Override - public Future directCommit() { - throw new UnsupportedOperationException("directCommit is not supported for " + getClass()); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java deleted file mode 100644 index c8fab0822a..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.compat; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.datastore.TransactionReadyReplyMapper; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import scala.concurrent.Future; -import akka.actor.ActorSelection; - -/** - * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which - * is backing a particular transaction. This class supports the Helium base release - * behavior. - */ -@Deprecated -public final class PreLithiumTransactionReadyReplyMapper extends TransactionReadyReplyMapper { - private final String transactionPath; - - private PreLithiumTransactionReadyReplyMapper(ActorContext actorContext, TransactionIdentifier identifier, final String transactionPath) { - super(actorContext, identifier); - this.transactionPath = Preconditions.checkNotNull(transactionPath); - } - - @Override - protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) { - return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath()); - } - - public static Future transform(final Future readyReplyFuture, final ActorContext actorContext, - final TransactionIdentifier identifier, final String transactionPath) { - return readyReplyFuture.transform(new PreLithiumTransactionReadyReplyMapper(actorContext, identifier, transactionPath), - SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7b541076de..1915621efd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -452,30 +452,6 @@ public class ActorContext { } } - /** - * @deprecated This method is present only to support backward compatibility with Helium and should not be - * used any further - * - * - * @param primaryPath - * @param localPathOfRemoteActor - * @return - */ - @Deprecated - public String resolvePath(final String primaryPath, - final String localPathOfRemoteActor) { - StringBuilder builder = new StringBuilder(); - String[] primaryPathElements = primaryPath.split("/"); - builder.append(primaryPathElements[0]).append("//") - .append(primaryPathElements[1]).append(primaryPathElements[2]); - String[] remotePathElements = localPathOfRemoteActor.split("/"); - for (int i = 3; i < remotePathElements.length; i++) { - builder.append("/").append(remotePathElements[i]); - } - - return builder.toString(); - } - /** * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow * us to create a timer for pretty much anything. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 0e1a3b7304..0a50e165fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -381,7 +381,7 @@ public abstract class AbstractTransactionProxyTest { TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) { ActorRef txActorRef; - if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION && + if(type == TransactionType.WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) { txActorRef = shardActorRef; } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 9e97d28c2c..ae9694a857 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -40,7 +40,6 @@ import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; @@ -504,36 +503,6 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } - @Test - public void testOnReceivePreLithiumReadyTransaction() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), - "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION); - - JavaTestKit watcher = new JavaTestKit(getSystem()); - watcher.watch(transaction); - - transaction.tell(new ReadyTransaction().toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); - }}; - - // test - new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), - "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION); - - JavaTestKit watcher = new JavaTestKit(getSystem()); - watcher.watch(transaction); - - transaction.tell(new ReadyTransaction(), getRef()); - - expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); - }}; - } - @Test public void testOnReceiveCreateSnapshot() throws Exception { new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 5fb02619e9..1cf45cef01 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -8,36 +8,16 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.inOrder; -import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.dispatch.Dispatchers; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Optional; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; -import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.ShardDataTree; -import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; @@ -55,16 +35,11 @@ import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -196,188 +171,4 @@ public class PreLithiumShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - - @SuppressWarnings({ "unchecked" }) - @Test - public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable { - new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreLithiumConcurrentThreePhaseCommits"); - - waitUntilLeader(shard); - - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); - - long timeoutSec = 5; - final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); - final Timeout timeout = new Timeout(duration); - - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. - - shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef()); - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); - - // Send the CanCommitTransaction message for the first Tx. - - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - // Send the ForwardedReadyTransaction for the next 2 Tx's. - - shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and - // processed after the first Tx completes. - - Future canCommitFuture1 = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); - - Future canCommitFuture2 = Patterns.ask(shard, - new CanCommitTransaction(transactionID3).toSerializable(), timeout); - - // Send the CommitTransaction message for the first Tx. After it completes, it should - // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. - - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - // Wait for the next 2 Tx's to complete. - - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch commitLatch = new CountDownLatch(2); - - class OnFutureComplete extends OnComplete { - private final Class expRespType; - - OnFutureComplete(final Class expRespType) { - this.expRespType = expRespType; - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - if(error != null) { - caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error)); - } else { - try { - assertEquals("Commit response type", expRespType, resp.getClass()); - onSuccess(resp); - } catch (Exception e) { - caughtEx.set(e); - } - } - } - - void onSuccess(final Object resp) throws Exception { - } - } - - class OnCommitFutureComplete extends OnFutureComplete { - OnCommitFutureComplete() { - super(CommitTransactionReply.SERIALIZABLE_CLASS); - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - super.onComplete(error, resp); - commitLatch.countDown(); - } - } - - class OnCanCommitFutureComplete extends OnFutureComplete { - private final String transactionID; - - OnCanCommitFutureComplete(final String transactionID) { - super(CanCommitTransactionReply.SERIALIZABLE_CLASS); - this.transactionID = transactionID; - } - - @Override - void onSuccess(final Object resp) throws Exception { - CanCommitTransactionReply canCommitReply = - CanCommitTransactionReply.fromSerializable(resp); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - Future commitFuture = Patterns.ask(shard, - new CommitTransaction(transactionID).toSerializable(), timeout); - commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); - } - } - - canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), - getSystem().dispatcher()); - - canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), - getSystem().dispatcher()); - - boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); - - if(caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Commits complete", true, done); - - InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); - - // Verify data in the data store. - - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - - verifyLastApplied(shard, 2); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java deleted file mode 100644 index 6ca17838d0..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.compat; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE; -import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY; -import akka.actor.ActorRef; -import akka.dispatch.Futures; -import akka.util.Timeout; -import com.google.common.base.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.Mockito; -import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort; -import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.TransactionProxy; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; -import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import scala.concurrent.Future; - -/** - * Unit tests for backwards compatibility with pre-Lithium versions. - * - * @author Thomas Pantelis - */ -public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest { - - private static WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { - WriteData obj = WriteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private static MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { - MergeData obj = MergeData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private static DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && - DeleteData.fromSerializable(argument).getPath().equals(expPath); - } - }; - - return argThat(matcher); - } - - private static CanCommitTransaction eqCanCommitTransaction(final String transactionID) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) && - CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); - } - }; - - return argThat(matcher); - } - - private static CommitTransaction eqCommitTransaction(final String transactionID) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) && - CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); - } - }; - - return argThat(matcher); - } - - private static Future readySerializedTxReply(String path, short version) { - return Futures.successful(new ReadyTransactionReply(path, version).toSerializable()); - } - - private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version, - DefaultShardStrategy.DEFAULT_SHARD); - - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH)); - - doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - - doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); - - doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - - doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - - doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - - TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); - - Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). - get(5, TimeUnit.SECONDS); - - assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - assertEquals("Response NormalizedNode", testNode, readOptional.get()); - - transactionProxy.write(TestModel.TEST_PATH, testNode); - - transactionProxy.merge(TestModel.TEST_PATH, testNode); - - transactionProxy.delete(TestModel.TEST_PATH); - - AbstractThreePhaseCommitCohort proxy = transactionProxy.ready(); - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - - doThreePhaseCommit(actorRef, transactionProxy, proxy); - - return actorRef; - } - - private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy, - AbstractThreePhaseCommitCohort proxy) throws InterruptedException, ExecutionException, TimeoutException { - doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( - transactionProxy.getIdentifier().toString()), any(Timeout.class)); - - doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( - transactionProxy.getIdentifier().toString()), any(Timeout.class)); - - Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit.booleanValue()); - - proxy.preCommit().get(3, TimeUnit.SECONDS); - - proxy.commit().get(3, TimeUnit.SECONDS); - - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( - transactionProxy.getIdentifier().toString()), any(Timeout.class)); - - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( - transactionProxy.getIdentifier().toString()), any(Timeout.class)); - } - - @Test - public void testCompatibilityWithBaseHeliumVersion() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); - - verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - - @Test - public void testCompatibilityWithHeliumR1Version() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); - - verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - - @Test - public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { - short version = DataStoreVersions.HELIUM_2_VERSION; - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version, - DefaultShardStrategy.DEFAULT_SHARD); - - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - - doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - - TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); - - transactionProxy.write(TestModel.TEST_PATH, testNode); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - AbstractThreePhaseCommitCohort proxy = (AbstractThreePhaseCommitCohort) ready; - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - - doThreePhaseCommit(actorRef, transactionProxy, proxy); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java deleted file mode 100644 index 42b6d0d961..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.compat; - -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.dispatch.Dispatchers; -import akka.testkit.TestActorRef; -import org.junit.Assert; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.ShardTest; -import org.opendaylight.controller.cluster.datastore.ShardTestKit; -import org.opendaylight.controller.cluster.datastore.TransactionType; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.duration.FiniteDuration; - -/** - * Tests backwards compatibility support from Helium-1 to Helium. - * - * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the - * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction, - * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages - * would be sans transactionId so this test verifies the Shard handles that properly. - * - * @author Thomas Pantelis - */ -public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest { - - @Test - public void testTransactionCommit() throws Exception { - new ShardTestKit(getSystem()) {{ - SchemaContext schemaContext = TestModel.createTestContext(); - Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1"). - shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props(). - withDispatcher(Dispatchers.DefaultDispatcherId()); - - final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, - "testTransactionCommit"); - - waitUntilLeader(shard); - - // Send CreateTransaction message with no messages version - - String transactionID = "txn-1"; - shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() - .setTransactionId(transactionID) - .setTransactionType(TransactionType.WRITE_ONLY.ordinal()) - .setTransactionChainId("").build(), getRef()); - - final FiniteDuration duration = duration("5 seconds"); - - CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); - - ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); - - // Write data to the Tx - - txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION). - toSerializable(), getRef()); - - expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class); - - // Ready the Tx - - txActor.tell(new ReadyTransaction().toSerializable(), getRef()); - - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( - duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - - ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); - - // Send the CanCommitTransaction message with no transactionId. - - cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), - getRef()); - - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); - - // Send the PreCommitTransaction message with no transactionId. - - cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(), - getRef()); - - expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS); - - // Send the CommitTransaction message with no transactionId. - - cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(), - getRef()); - - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - NormalizedNode node = ShardTest.readStore(shard, TestModel.TEST_PATH); - Assert.assertNotNull("Data not found in store", node); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } - - @Test - public void testTransactionAbort() throws Exception { - new ShardTestKit(getSystem()) {{ - SchemaContext schemaContext = TestModel.createTestContext(); - Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1"). - shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props(). - withDispatcher(Dispatchers.DefaultDispatcherId()); - - final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, - "testTransactionAbort"); - - waitUntilLeader(shard); - - // Send CreateTransaction message with no messages version - - String transactionID = "txn-1"; - shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() - .setTransactionId(transactionID) - .setTransactionType(TransactionType.WRITE_ONLY.ordinal()) - .setTransactionChainId("").build(), getRef()); - - final FiniteDuration duration = duration("5 seconds"); - - CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); - - ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); - - // Write data to the Tx - - txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), - DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef()); - - expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable( - DataStoreVersions.BASE_HELIUM_VERSION).getClass()); - - // Ready the Tx - - txActor.tell(new ReadyTransaction().toSerializable(), getRef()); - - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( - duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - - ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); - - // Send the CanCommitTransaction message with no transactionId. - - cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), - getRef()); - - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); - - // Send the AbortTransaction message with no transactionId. - - cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(), - getRef()); - - expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index a2794bde9c..14536ba7a0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -292,53 +292,6 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/")); } - @Test - public void testResolvePathForRemoteActor() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock( - ClusterWrapper.class), - mock(Configuration.class)); - - String actual = actorContext.resolvePath( - "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); - - String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; - - assertEquals(expected, actual); - } - - @Test - public void testResolvePathForLocalActor() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); - - String actual = actorContext.resolvePath( - "akka://system/user/shardmanager/shard", - "akka://system/user/shardmanager/shard/transaction"); - - String expected = "akka://system/user/shardmanager/shard/transaction"; - - assertEquals(expected, actual); - } - - @Test - public void testResolvePathForRemoteActorWithProperRemoteAddress() { - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class)); - - String actual = actorContext.resolvePath( - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", - "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); - - String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; - - assertEquals(expected, actual); - } - - @Test public void testClientDispatcherIsGlobalDispatcher(){ ActorContext actorContext = -- 2.36.6