} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
parent, shardName);
- remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
+ remote.setPrimaryShard(primaryShardInfo);
}
} finally {
onTransactionContextCreated(parent.getIdentifier());
/**
* The target primary shard.
*/
- private volatile ActorSelection primaryShard;
+ private volatile PrimaryShardInfo primaryShardInfo;
/**
* The total timeout for creating a tx on the primary shard.
/**
* Sets the target primary shard and initiates a CreateTransaction try.
*/
- void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
- this.primaryShard = primaryShard;
+ void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
+ this.primaryShardInfo = primaryShardInfo;
if (getTransactionType() == TransactionType.WRITE_ONLY &&
getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();
+
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
getIdentifier(), primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
- transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
- this.primaryShard.path().toString(), primaryVersion));
+ transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
+ primaryShard, primaryShard.path().toString(), primaryShardInfo.getPrimaryShardVersion()));
} else {
tryCreateTransaction();
}
*/
private void tryCreateTransaction() {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor());
}
Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
- getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
+ getTransactionType().ordinal(), getIdentifier().getChainId(),
+ primaryShardInfo.getPrimaryShardVersion()).toSerializable();
- Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard,
- serializedCreateMessage, createTxMessageTimeout);
+ Future<Object> createTxFuture = getActorContext().executeOperationAsync(
+ primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout);
createTxFuture.onComplete(new OnComplete<Object>() {
@Override
private void tryFindPrimaryShard() {
LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
- this.primaryShard = null;
+ this.primaryShardInfo = null;
Future<PrimaryShardInfo> findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
if (failure == null) {
- this.primaryShard = primaryShardInfo.getPrimaryShardActor();
+ this.primaryShardInfo = primaryShardInfo;
tryCreateTransaction();
} else {
LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
private void onCreateTransactionComplete(Throwable failure, Object response) {
// An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
// the cached remote leader actor is no longer available.
- boolean retryCreateTransaction = this.primaryShard != null &&
+ boolean retryCreateTransaction = primaryShardInfo != null &&
(failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
if(retryCreateTransaction) {
// Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
}
localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
- } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
+ } else if (CreateTransactionReply.isSerializedType(response)) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
} else {
LOG.debug("Tx {} Received {}", getIdentifier(), reply);
return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
- reply.getTransactionPath(), reply.getVersion());
+ reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion());
}
private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
}
try {
- if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
createTransaction.getVersion());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
- createTransaction.getTransactionId()).toSerializable(), getSelf());
+ createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.Preconditions;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.japi.Creator;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-/**
- * The ShardTransactionChain Actor represents a remote TransactionChain
- */
-public class ShardTransactionChain extends AbstractUntypedActor {
-
- private final ShardDataTreeTransactionChain chain;
- private final DatastoreContext datastoreContext;
- private final ShardStats shardStats;
-
- public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
- ShardStats shardStats) {
- this.chain = Preconditions.checkNotNull(chain);
- this.datastoreContext = datastoreContext;
- this.shardStats = shardStats;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- CreateTransaction createTransaction = CreateTransaction.fromSerializable( message);
- createTransaction(createTransaction);
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
- chain.close();
- getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
- }else{
- unknownMessage(message);
- }
- }
-
- private ActorRef getShardActor(){
- return getContext().parent();
- }
-
- private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
- String transactionName = "shard-" + createTransaction.getTransactionId();
-
- final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType());
- final AbstractShardDataTreeTransaction<?> transaction;
- switch (type) {
- case READ_ONLY:
- transaction = chain.newReadOnlyTransaction(transactionName);
- break;
- case READ_WRITE:
- case WRITE_ONLY:
- transaction = chain.newReadWriteTransaction(transactionName);
- break;
- default:
- throw new IllegalArgumentException("Unhandled transaction type " + type);
- }
-
- return getContext().actorOf(
- ShardTransaction.props(type, transaction, getShardActor(),
- datastoreContext, shardStats, createTransaction.getTransactionId(),
- createTransaction.getVersion()), transactionName);
- }
-
- private void createTransaction(CreateTransaction createTransaction) {
-
- ActorRef transactionActor = createTypedTransactionActor(createTransaction);
- getSender().tell(new CreateTransactionReply(transactionActor.path().toString(),
- createTransaction.getTransactionId()).toSerializable(), getSelf());
- }
-
- public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, ShardStats shardStats) {
- return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
- }
-
- private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
- private static final long serialVersionUID = 1L;
-
- final ShardDataTreeTransactionChain chain;
- final DatastoreContext datastoreContext;
- final ShardStats shardStats;
-
- ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
- ShardStats shardStats) {
- this.chain = chain;
- this.datastoreContext = datastoreContext;
- this.shardStats = shardStats;
- }
-
- @Override
- public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, datastoreContext, shardStats);
- }
- }
-}
package org.opendaylight.controller.cluster.datastore.messages;
-
import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+public class CreateTransaction extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
-public class CreateTransaction implements SerializableMessage {
- public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CreateTransaction.class;
-
- private final String transactionId;
- private final int transactionType;
- private final String transactionChainId;
- private final short version;
+ private String transactionId;
+ private int transactionType;
+ private String transactionChainId;
- public CreateTransaction(String transactionId, int transactionType) {
- this(transactionId, transactionType, "");
+ public CreateTransaction() {
}
- public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
- this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
- }
-
- private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+ public CreateTransaction(String transactionId, int transactionType, String transactionChainId,
short version) {
+ super(version);
this.transactionId = Preconditions.checkNotNull(transactionId);
this.transactionType = transactionType;
- this.transactionChainId = transactionChainId;
- this.version = version;
+ this.transactionChainId = transactionChainId != null ? transactionChainId : "";
}
public String getTransactionId() {
return transactionType;
}
- public short getVersion() {
- return version;
+ public String getTransactionChainId() {
+ return transactionChainId;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ transactionId = in.readUTF();
+ transactionType = in.readInt();
+ transactionChainId = in.readUTF();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(transactionId);
+ out.writeInt(transactionType);
+ out.writeUTF(transactionChainId);
}
@Override
public Object toSerializable() {
- return ShardTransactionMessages.CreateTransaction.newBuilder()
- .setTransactionId(transactionId)
- .setTransactionType(transactionType)
- .setTransactionChainId(transactionChainId)
- .setMessageVersion(version).build();
+ if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+ return this;
+ } else {
+ return ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionId).setTransactionType(transactionType)
+ .setTransactionChainId(transactionChainId).setMessageVersion(getVersion()).build();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "CreateTransaction [transactionId=" + transactionId + ", transactionType=" + transactionType
+ + ", transactionChainId=" + transactionChainId + "]";
}
public static CreateTransaction fromSerializable(Object message) {
- ShardTransactionMessages.CreateTransaction createTransaction =
- (ShardTransactionMessages.CreateTransaction) message;
- return new CreateTransaction(createTransaction.getTransactionId(),
- createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
- (short)createTransaction.getMessageVersion());
+ if(message instanceof CreateTransaction) {
+ return (CreateTransaction)message;
+ } else {
+ ShardTransactionMessages.CreateTransaction createTransaction =
+ (ShardTransactionMessages.CreateTransaction) message;
+ return new CreateTransaction(createTransaction.getTransactionId(),
+ createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+ (short)createTransaction.getMessageVersion());
+ }
}
- public String getTransactionChainId() {
- return transactionChainId;
+ public static boolean isSerializedType(Object message) {
+ return message instanceof CreateTransaction || message instanceof ShardTransactionMessages.CreateTransaction;
}
}
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class CreateTransactionReply implements SerializableMessage {
+public class CreateTransactionReply extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
- public static final Class<?> SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
- private final String transactionPath;
- private final String transactionId;
- private final short version;
+ private String transactionPath;
+ private String transactionId;
- public CreateTransactionReply(String transactionPath, String transactionId) {
- this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
+ public CreateTransactionReply() {
}
- public CreateTransactionReply(final String transactionPath,
- final String transactionId, final short version) {
+ public CreateTransactionReply(final String transactionPath, final String transactionId, final short version) {
+ super(version);
this.transactionPath = transactionPath;
this.transactionId = transactionId;
- this.version = version;
}
-
public String getTransactionPath() {
return transactionPath;
}
return transactionId;
}
- public short getVersion() {
- return version;
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ transactionId = in.readUTF();
+ transactionPath = in.readUTF();
}
@Override
- public Object toSerializable(){
- return ShardTransactionMessages.CreateTransactionReply.newBuilder()
- .setTransactionActorPath(transactionPath)
- .setTransactionId(transactionId)
- .setMessageVersion(version)
- .build();
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(transactionId);
+ out.writeUTF(transactionPath);
}
- public static CreateTransactionReply fromSerializable(Object serializable){
- ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
- return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
- (short)o.getMessageVersion());
+ @Override
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+ return this;
+ } else {
+ return ShardTransactionMessages.CreateTransactionReply.newBuilder().setTransactionActorPath(transactionPath)
+ .setTransactionId(transactionId).setMessageVersion(getVersion()).build();
+ }
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
- .append(transactionId).append(", version=").append(version).append("]");
+ .append(transactionId).append(", version=").append(getVersion()).append("]");
return builder.toString();
}
+
+ public static CreateTransactionReply fromSerializable(Object serializable) {
+ if(serializable instanceof CreateTransactionReply) {
+ return (CreateTransactionReply)serializable;
+ } else {
+ ShardTransactionMessages.CreateTransactionReply o =
+ (ShardTransactionMessages.CreateTransactionReply) serializable;
+ return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+ (short)o.getMessageVersion());
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof CreateTransactionReply ||
+ message instanceof ShardTransactionMessages.CreateTransactionReply;
+ }
}
}
public VersionedExternalizableMessage(short version) {
- this.version = version;
+ this.version = version <= DataStoreVersions.CURRENT_VERSION ? version: DataStoreVersions.CURRENT_VERSION;
}
public short getVersion() {
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
@Override
public boolean matches(Object argument) {
- if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ if(CreateTransaction.class.equals(argument.getClass())) {
CreateTransaction obj = CreateTransaction.fromSerializable(argument);
return obj.getTransactionId().startsWith(memberName) &&
obj.getTransactionType() == type.ordinal();
eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
}
- protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
- return CreateTransactionReply.newBuilder()
- .setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1")
- .setMessageVersion(transactionVersion)
- .build();
+ protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion){
+ return new CreateTransactionReply(actorRef.path().toString(), "txn-1", transactionVersion);
}
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
- private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
-
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
@Test
shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shard.tell(new CreateTransaction("txn-1",
- TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+ shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+ DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
CreateTransactionReply.class);
- final String path = reply.getTransactionActorPath().toString();
+ final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
}};
waitUntilLeader(shard);
- shard.tell(new CreateTransaction("txn-1",
- TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
- getRef());
+ shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
+ DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
CreateTransactionReply.class);
- final String path = reply.getTransactionActorPath().toString();
+ final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
}};
// Create a read Tx on the same chain.
- shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
- transactionChainID).toSerializable(), getRef());
+ shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
+ transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
- getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
assertEquals("Read node", containerNode, readReply.getNormalizedNode());
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
String actorPath = txActorRef.path().toString();
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
- setTransactionId("txn-1").setTransactionActorPath(actorPath).
- setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
+ CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
+ DataStoreVersions.CURRENT_VERSION);
doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
return dataTreeOptional;
}
- private static Optional<DataTree> createDataTree(NormalizedNode readResponse){
+ private static Optional<DataTree> createDataTree(NormalizedNode<?, ?> readResponse){
DataTree dataTree = mock(DataTree.class);
Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
@Test
public void testReadCompletionForLocalShard(){
- final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testExistsCompletionForLocalShard(){
- final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+ @SuppressWarnings("unchecked")
Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
for(NormalizedNode<?,?> node : collection){
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Shard unit tests for backwards compatibility with pre-Boron versions.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreBoronShardTest extends AbstractShardTest {
+
+ @Test
+ public void testCreateTransaction(){
+ new ShardTestKit(getSystem()) {{
+ final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
+
+ waitUntilLeader(shard);
+
+ shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+ DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+
+ ShardTransactionMessages.CreateTransactionReply reply =
+ expectMsgClass(ShardTransactionMessages.CreateTransactionReply.class);
+
+ final String path = CreateTransactionReply.fromSerializable(reply).getTransactionPath().toString();
+ assertTrue("Unexpected transaction path " + path,
+ path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+ }};
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore.compat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
/**
* TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
*/
public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
+ private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
+ ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(ShardTransactionMessages.CreateTransaction.class.equals(argument.getClass())) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+
+ return false;
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef, int transactionVersion){
+ return CreateTransactionReply.newBuilder()
+ .setTransactionActorPath(actorRef.path().toString())
+ .setTransactionId("txn-1")
+ .setMessageVersion(transactionVersion)
+ .build();
+ }
+
+ private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+ TransactionType type) {
+ ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
+ DefaultShardStrategy.DEFAULT_SHARD, DataStoreVersions.LITHIUM_VERSION);
+
+ ActorRef txActorRef;
+ if(type == TransactionType.WRITE_ONLY) {
+ txActorRef = shardActorRef;
+ } else {
+ txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ doReturn(actorSystem.actorSelection(txActorRef.path())).
+ when(mockActorContext).actorSelection(txActorRef.path().toString());
+
+ doReturn(Futures.successful(legacyCreateTransactionReply(txActorRef, DataStoreVersions.LITHIUM_VERSION)))
+ .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqLegacyCreateTransaction(type), any(Timeout.class));
+ }
+
+ return txActorRef;
+
+ }
+
@Test
public void testClose() throws Exception{
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
- DataStoreVersions.LITHIUM_VERSION, DefaultShardStrategy.DEFAULT_SHARD);
+ ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for CreateTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateTransactionReplyTest {
+
+ @Test
+ public void testSerialization() {
+ CreateTransactionReply expected = new CreateTransactionReply("txPath", "txId", DataStoreVersions.CURRENT_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", CreateTransactionReply.class, serialized.getClass());
+
+ CreateTransactionReply actual = CreateTransactionReply.fromSerializable(
+ SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+ assertEquals("getTransactionPath", expected.getTransactionPath(), actual.getTransactionPath());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ CreateTransactionReply expected = new CreateTransactionReply("txPath", "txId", DataStoreVersions.LITHIUM_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.CreateTransactionReply.class, serialized.getClass());
+
+ CreateTransactionReply actual = CreateTransactionReply.fromSerializable(
+ SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+ assertEquals("getTransactionPath", expected.getTransactionPath(), actual.getTransactionPath());
+ assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, CreateTransactionReply.isSerializedType(
+ ShardTransactionMessages.CreateTransactionReply.newBuilder().setTransactionActorPath("")
+ .setTransactionId("").setMessageVersion(4).build()));
+
+ assertEquals("isSerializedType", true, CreateTransactionReply.isSerializedType(new CreateTransactionReply()));
+ assertEquals("isSerializedType", false, CreateTransactionReply.isSerializedType(new Object()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for CreateTransaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateTransactionTest {
+
+ @Test
+ public void testSerialization() {
+ CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", DataStoreVersions.CURRENT_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
+
+ CreateTransaction actual = CreateTransaction.fromSerializable(
+ SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+ assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+ assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", DataStoreVersions.LITHIUM_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.CreateTransaction.class, serialized.getClass());
+
+ CreateTransaction actual = CreateTransaction.fromSerializable(
+ SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+ assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+ assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+ assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithNewerVersion() {
+ short newerVersion = DataStoreVersions.CURRENT_VERSION + (short)1;
+ CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", newerVersion);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
+
+ CreateTransaction actual = CreateTransaction.fromSerializable(
+ SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+ assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+ assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, CreateTransaction.isSerializedType(
+ ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId("")
+ .setTransactionType(2).setTransactionChainId("").setMessageVersion(4).build()));
+
+ assertEquals("isSerializedType", true, CreateTransaction.isSerializedType(new CreateTransaction()));
+ assertEquals("isSerializedType", false, CreateTransaction.isSerializedType(new Object()));
+ }
+}