import akka.util.Timeout;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
this.primaryShard = primaryShard;
- if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ if (getTransactionType() == TransactionType.WRITE_ONLY &&
getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
getIdentifier(), primaryShard);
// TxActor is always created where the leader of the shard is.
// Check if TxActor is created in the same node
boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
- final TransactionContext ret;
-
- if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- ret = new PreLithiumTransactionContextImpl(transactionContextWrapper.getIdentifier(), transactionPath, transactionActor,
- getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextWrapper.getLimiter());
- } else {
- ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), transactionActor, getActorContext(),
- isTxActorLocal, remoteTransactionVersion, transactionContextWrapper.getLimiter());
- }
+ final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
+ transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion,
+ transactionContextWrapper.getLimiter());
if(parent.getType() == TransactionType.READ_ONLY) {
TransactionContextCleanup.track(this, ret);
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
return;
}
- if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
- // Return our actor path as we'll handle the three phase commit except if the Tx client
- // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
- // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
- // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
- ActorRef replyActorPath = shard.self();
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
- replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
- }
-
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
- ready.getTxnClientVersion());
- sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, shard.self());
+ if(ready.isDoImmediateCommit()) {
+ cohortEntry.setDoImmediateCommit(true);
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
} else {
- if(ready.isDoImmediateCommit()) {
- cohortEntry.setDoImmediateCommit(true);
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
- handleCanCommit(cohortEntry);
- } else {
- // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
- // front-end so send back a ReadyTransactionReply with our actor path.
- sender.tell(readyTransactionReply(shard), shard.self());
- }
+ // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+ // front-end so send back a ReadyTransactionReply with our actor path.
+ sender.tell(readyTransactionReply(shard), shard.self());
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Arrays;
operationCallbackRef.get().success();
- if(cohortResponse instanceof ActorSelection) {
- handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
- return;
- }
-
LOG.debug("Tx {} successfully completed direct commit", transactionId);
// The Future was the result of a direct commit to the shard, essentially eliding the
List<Future<Object>> getCohortFutures() {
return Arrays.asList(cohortFuture);
}
-
- private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
- // Handle backwards compatibility. An ActorSelection response would be returned from a
- // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
- delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(Futures.successful(actorSelection)), transactionId);
- com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(Boolean canCommit) {
- returnFuture.set(canCommit);
- }
-
- @Override
- public void onFailure(Throwable t) {
- returnFuture.setException(t);
- }
- });
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.compat;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
- * messages don't contain the transactionId. This actor just forwards a new message containing the
- * transactionId to the parent Shard.
- *
- * @author Thomas Pantelis
- */
-public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class);
-
- private final String transactionId;
-
- private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
- this.transactionId = transactionId;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
-
- getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
- getContext());
- } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
- LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
-
- // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
- getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
- } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
-
- getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
- getContext());
-
- // We're done now - we can self-destruct
- self().tell(PoisonPill.getInstance(), self());
- } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
-
- getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
- getContext());
- self().tell(PoisonPill.getInstance(), self());
- }
- }
-
- public static Props props(String transactionId) {
- return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
- }
-
- private static class BackwardsCompatibleThreePhaseCommitCohortCreator
- implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
- private static final long serialVersionUID = 1L;
-
- private final String transactionId;
-
- BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
- this.transactionId = transactionId;
- }
-
- @Override
- public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
- return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.compat;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.OperationLimiter;
-import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
- * support the BatchedModifications message.
- *
- * @author Thomas Pantelis
- */
-@Deprecated
-public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
- private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
-
- private final String transactionPath;
-
- public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
- ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationLimiter limiter) {
- super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
- this.transactionPath = transactionPath;
- }
-
- @Override
- public void executeModification(AbstractModification modification) {
- final short remoteTransactionVersion = getRemoteTransactionVersion();
- final YangInstanceIdentifier path = modification.getPath();
- VersionedExternalizableMessage msg = null;
-
- if(modification instanceof DeleteModification) {
- msg = new DeleteData(path, remoteTransactionVersion);
- } else if(modification instanceof WriteModification) {
- final NormalizedNode<?, ?> data = ((WriteModification) modification).getData();
-
- // be sure to check for Merge before Write, since Merge is a subclass of Write
- if(modification instanceof MergeModification) {
- msg = new MergeData(path, data, remoteTransactionVersion);
- } else {
- msg = new WriteData(path, data, remoteTransactionVersion);
- }
- } else {
- LOG.error("Invalid modification type " + modification.getClass().getName());
- }
-
- if(msg != null) {
- executeOperationAsync(msg);
- }
- }
-
- @Override
- public Future<ActorSelection> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
- // Send the ReadyTransaction message to the Tx actor.
-
- Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
-
- return transformReadyReply(lastReplyFuture);
- }
-
- @Override
- protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
- // In base Helium we used to return the local path of the actor which represented
- // a remote ThreePhaseCommitCohort. The local path would then be converted to
- // a remote path using this resolvePath method. To maintain compatibility with
- // a Helium node we need to continue to do this conversion.
- // At some point in the future when upgrades from Helium are not supported
- // we could remove this code to resolvePath and just use the cohortPath as the
- // resolved cohortPath
- if (getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- return PreLithiumTransactionReadyReplyMapper.transform(readyReplyFuture, getActorContext(), getIdentifier(), transactionPath);
- } else {
- return super.transformReadyReply(readyReplyFuture);
- }
- }
-
- @Override
- public boolean supportsDirectCommit() {
- return false;
- }
-
- @Override
- public Future<Object> directCommit() {
- throw new UnsupportedOperationException("directCommit is not supported for " + getClass());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.compat;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.datastore.TransactionReadyReplyMapper;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import scala.concurrent.Future;
-import akka.actor.ActorSelection;
-
-/**
- * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which
- * is backing a particular transaction. This class supports the Helium base release
- * behavior.
- */
-@Deprecated
-public final class PreLithiumTransactionReadyReplyMapper extends TransactionReadyReplyMapper {
- private final String transactionPath;
-
- private PreLithiumTransactionReadyReplyMapper(ActorContext actorContext, TransactionIdentifier identifier, final String transactionPath) {
- super(actorContext, identifier);
- this.transactionPath = Preconditions.checkNotNull(transactionPath);
- }
-
- @Override
- protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) {
- return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
- }
-
- public static Future<ActorSelection> transform(final Future<Object> readyReplyFuture, final ActorContext actorContext,
- final TransactionIdentifier identifier, final String transactionPath) {
- return readyReplyFuture.transform(new PreLithiumTransactionReadyReplyMapper(actorContext, identifier, transactionPath),
- SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
- }
-}
}
}
- /**
- * @deprecated This method is present only to support backward compatibility with Helium and should not be
- * used any further
- *
- *
- * @param primaryPath
- * @param localPathOfRemoteActor
- * @return
- */
- @Deprecated
- public String resolvePath(final String primaryPath,
- final String localPathOfRemoteActor) {
- StringBuilder builder = new StringBuilder();
- String[] primaryPathElements = primaryPath.split("/");
- builder.append(primaryPathElements[0]).append("//")
- .append(primaryPathElements[1]).append(primaryPathElements[2]);
- String[] remotePathElements = localPathOfRemoteActor.split("/");
- for (int i = 3; i < remotePathElements.length; i++) {
- builder.append("/").append(remotePathElements[i]);
- }
-
- return builder.toString();
- }
-
/**
* This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
* us to create a timer for pretty much anything.
TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) {
ActorRef txActorRef;
- if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ if(type == TransactionType.WRITE_ONLY &&
dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
txActorRef = shardActorRef;
} else {
import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
}};
}
- @Test
- public void testOnReceivePreLithiumReadyTransaction() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
-
- JavaTestKit watcher = new JavaTestKit(getSystem());
- watcher.watch(transaction);
-
- transaction.tell(new ReadyTransaction().toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
- watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
- }};
-
- // test
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
-
- JavaTestKit watcher = new JavaTestKit(getSystem());
- watcher.watch(transaction);
-
- transaction.tell(new ReadyTransaction(), getRef());
-
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
- watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
- }};
- }
-
@Test
public void testOnReceiveCreateSnapshot() throws Exception {
new JavaTestKit(getSystem()) {{
package org.opendaylight.controller.cluster.datastore.compat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
-import org.mockito.InOrder;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardDataTree;
-import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
/**
* Unit tests for backwards compatibility with pre-Lithium versions.
testRecovery(listEntryKeys);
}
-
- @SuppressWarnings({ "unchecked" })
- @Test
- public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreLithiumConcurrentThreePhaseCommits");
-
- waitUntilLeader(shard);
-
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
- String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
-
- long timeoutSec = 5;
- final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
- final Timeout timeout = new Timeout(duration);
-
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
-
- shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef());
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
- // Send the CanCommitTransaction message for the first Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
- shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
- // processed after the first Tx completes.
-
- Future<Object> canCommitFuture1 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
-
- Future<Object> canCommitFuture2 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID3).toSerializable(), timeout);
-
- // Send the CommitTransaction message for the first Tx. After it completes, it should
- // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
-
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- // Wait for the next 2 Tx's to complete.
-
- final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
- final CountDownLatch commitLatch = new CountDownLatch(2);
-
- class OnFutureComplete extends OnComplete<Object> {
- private final Class<?> expRespType;
-
- OnFutureComplete(final Class<?> expRespType) {
- this.expRespType = expRespType;
- }
-
- @Override
- public void onComplete(final Throwable error, final Object resp) {
- if(error != null) {
- caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
- } else {
- try {
- assertEquals("Commit response type", expRespType, resp.getClass());
- onSuccess(resp);
- } catch (Exception e) {
- caughtEx.set(e);
- }
- }
- }
-
- void onSuccess(final Object resp) throws Exception {
- }
- }
-
- class OnCommitFutureComplete extends OnFutureComplete {
- OnCommitFutureComplete() {
- super(CommitTransactionReply.SERIALIZABLE_CLASS);
- }
-
- @Override
- public void onComplete(final Throwable error, final Object resp) {
- super.onComplete(error, resp);
- commitLatch.countDown();
- }
- }
-
- class OnCanCommitFutureComplete extends OnFutureComplete {
- private final String transactionID;
-
- OnCanCommitFutureComplete(final String transactionID) {
- super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
- this.transactionID = transactionID;
- }
-
- @Override
- void onSuccess(final Object resp) throws Exception {
- CanCommitTransactionReply canCommitReply =
- CanCommitTransactionReply.fromSerializable(resp);
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- Future<Object> commitFuture = Patterns.ask(shard,
- new CommitTransaction(transactionID).toSerializable(), timeout);
- commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
- }
- }
-
- canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
- getSystem().dispatcher());
-
- canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
- getSystem().dispatcher());
-
- boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
-
- if(caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- assertEquals("Commits complete", true, done);
-
- InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
-
- // Verify data in the data store.
-
- NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- MapEntryNode mapEntry = (MapEntryNode)entry;
- Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
-
- verifyLastApplied(shard, 2);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.compat;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
-import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import scala.concurrent.Future;
-
-/**
- * Unit tests for backwards compatibility with pre-Lithium versions.
- *
- * @author Thomas Pantelis
- */
-public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest {
-
- private static WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
- @Override
- public boolean matches(Object argument) {
- if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
- WriteData obj = WriteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
- }
-
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private static MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
- @Override
- public boolean matches(Object argument) {
- if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
- MergeData obj = MergeData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
- }
-
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private static DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
- ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
- @Override
- public boolean matches(Object argument) {
- return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
- DeleteData.fromSerializable(argument).getPath().equals(expPath);
- }
- };
-
- return argThat(matcher);
- }
-
- private static CanCommitTransaction eqCanCommitTransaction(final String transactionID) {
- ArgumentMatcher<CanCommitTransaction> matcher = new ArgumentMatcher<CanCommitTransaction>() {
- @Override
- public boolean matches(Object argument) {
- return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) &&
- CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
- }
- };
-
- return argThat(matcher);
- }
-
- private static CommitTransaction eqCommitTransaction(final String transactionID) {
- ArgumentMatcher<CommitTransaction> matcher = new ArgumentMatcher<CommitTransaction>() {
- @Override
- public boolean matches(Object argument) {
- return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) &&
- CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
- }
- };
-
- return argThat(matcher);
- }
-
- private static Future<Object> readySerializedTxReply(String path, short version) {
- return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
- }
-
- private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version,
- DefaultShardStrategy.DEFAULT_SHARD);
-
- NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH));
-
- doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
-
- doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
-
- doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
-
- doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
- doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
- eq(actorRef.path().toString()));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
- get(5, TimeUnit.SECONDS);
-
- assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
- assertEquals("Response NormalizedNode", testNode, readOptional.get());
-
- transactionProxy.write(TestModel.TEST_PATH, testNode);
-
- transactionProxy.merge(TestModel.TEST_PATH, testNode);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- AbstractThreePhaseCommitCohort<?> proxy = transactionProxy.ready();
-
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
-
- doThreePhaseCommit(actorRef, transactionProxy, proxy);
-
- return actorRef;
- }
-
- private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy,
- AbstractThreePhaseCommitCohort<?> proxy) throws InterruptedException, ExecutionException, TimeoutException {
- doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
- transactionProxy.getIdentifier().toString()), any(Timeout.class));
-
- doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
- transactionProxy.getIdentifier().toString()), any(Timeout.class));
-
- Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit.booleanValue());
-
- proxy.preCommit().get(3, TimeUnit.SECONDS);
-
- proxy.commit().get(3, TimeUnit.SECONDS);
-
- verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
- transactionProxy.getIdentifier().toString()), any(Timeout.class));
-
- verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
- transactionProxy.getIdentifier().toString()), any(Timeout.class));
- }
-
- @Test
- public void testCompatibilityWithBaseHeliumVersion() throws Exception {
- ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
-
- verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
- eq(actorRef.path().toString()));
- }
-
- @Test
- public void testCompatibilityWithHeliumR1Version() throws Exception {
- ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
-
- verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
- eq(actorRef.path().toString()));
- }
-
- @Test
- public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
- short version = DataStoreVersions.HELIUM_2_VERSION;
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
- DefaultShardStrategy.DEFAULT_SHARD);
-
- NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
-
- doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH, testNode);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- AbstractThreePhaseCommitCohort<?> proxy = (AbstractThreePhaseCommitCohort<?>) ready;
-
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
-
- doThreePhaseCommit(actorRef, transactionProxy, proxy);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.compat;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardTest;
-import org.opendaylight.controller.cluster.datastore.ShardTestKit;
-import org.opendaylight.controller.cluster.datastore.TransactionType;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Tests backwards compatibility support from Helium-1 to Helium.
- *
- * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
- * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
- * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
- * would be sans transactionId so this test verifies the Shard handles that properly.
- *
- * @author Thomas Pantelis
- */
-public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
-
- @Test
- public void testTransactionCommit() throws Exception {
- new ShardTestKit(getSystem()) {{
- SchemaContext schemaContext = TestModel.createTestContext();
- Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1").
- shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder().
- shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props().
- withDispatcher(Dispatchers.DefaultDispatcherId());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
- "testTransactionCommit");
-
- waitUntilLeader(shard);
-
- // Send CreateTransaction message with no messages version
-
- String transactionID = "txn-1";
- shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
- .setTransactionId(transactionID)
- .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
- .setTransactionChainId("").build(), getRef());
-
- final FiniteDuration duration = duration("5 seconds");
-
- CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
-
- ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
-
- // Write data to the Tx
-
- txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
- toSerializable(), getRef());
-
- expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
-
- // Ready the Tx
-
- txActor.tell(new ReadyTransaction().toSerializable(), getRef());
-
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
- duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
-
- ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
-
- // Send the CanCommitTransaction message with no transactionId.
-
- cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
- getRef());
-
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
-
- // Send the PreCommitTransaction message with no transactionId.
-
- cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
- getRef());
-
- expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
-
- // Send the CommitTransaction message with no transactionId.
-
- cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
- getRef());
-
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
- Assert.assertNotNull("Data not found in store", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
-
- @Test
- public void testTransactionAbort() throws Exception {
- new ShardTestKit(getSystem()) {{
- SchemaContext schemaContext = TestModel.createTestContext();
- Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1").
- shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder().
- shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props().
- withDispatcher(Dispatchers.DefaultDispatcherId());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
- "testTransactionAbort");
-
- waitUntilLeader(shard);
-
- // Send CreateTransaction message with no messages version
-
- String transactionID = "txn-1";
- shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
- .setTransactionId(transactionID)
- .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
- .setTransactionChainId("").build(), getRef());
-
- final FiniteDuration duration = duration("5 seconds");
-
- CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
-
- ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
-
- // Write data to the Tx
-
- txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
-
- expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
- DataStoreVersions.BASE_HELIUM_VERSION).getClass());
-
- // Ready the Tx
-
- txActor.tell(new ReadyTransaction().toSerializable(), getRef());
-
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
- duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
-
- ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
-
- // Send the CanCommitTransaction message with no transactionId.
-
- cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
- getRef());
-
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
-
- // Send the AbortTransaction message with no transactionId.
-
- cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
- getRef());
-
- expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
-}
assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
}
- @Test
- public void testResolvePathForRemoteActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForLocalActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForRemoteActorWithProperRemoteAddress() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
-
@Test
public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =