From: Moiz Raja Date: Sun, 2 Nov 2014 16:14:03 +0000 (-0800) Subject: BUG 2296 : TransactionProxy should support the ability to accept a local TPC actor... X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ec7cac6937c6beacf4d6dd9b44bb998d7b0068d4 BUG 2296 : TransactionProxy should support the ability to accept a local TPC actor path In Helium when the ShardTransaction processes a Ready message it sends back a the path of a ThreePhaseCommitCohort actor in the ReadyReply. This path is actually a local actor path, this local actor path is then converted into a remote path by the TransactionProxy. The fix for Bug 1607 breaks this capability which is required to support forward compatibility in a cluster where a transaction request originates in a node that has been upgraded to Helium-1 and the actual transaction is happening on a node which has not yet been upgraded to Helium-1. Change-Id: I857384bdd61b3492ea270dcf04d14883811c37c2 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java index 3cd290c6de..3a1cfaa443 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java @@ -1463,6 +1463,16 @@ public final class ShardTransactionMessages { */ com.google.protobuf.ByteString getTransactionIdBytes(); + + // optional int32 messageVersion = 3; + /** + * optional int32 messageVersion = 3; + */ + boolean hasMessageVersion(); + /** + * optional int32 messageVersion = 3; + */ + int getMessageVersion(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply} @@ -1525,6 +1535,11 @@ public final class ShardTransactionMessages { transactionId_ = input.readBytes(); break; } + case 24: { + bitField0_ |= 0x00000004; + messageVersion_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -1651,9 +1666,26 @@ public final class ShardTransactionMessages { } } + // optional int32 messageVersion = 3; + public static final int MESSAGEVERSION_FIELD_NUMBER = 3; + private int messageVersion_; + /** + * optional int32 messageVersion = 3; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 messageVersion = 3; + */ + public int getMessageVersion() { + return messageVersion_; + } + private void initFields() { transactionActorPath_ = ""; transactionId_ = ""; + messageVersion_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1681,6 +1713,9 @@ public final class ShardTransactionMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getTransactionIdBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt32(3, messageVersion_); + } getUnknownFields().writeTo(output); } @@ -1698,6 +1733,10 @@ public final class ShardTransactionMessages { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getTransactionIdBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, messageVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1818,6 +1857,8 @@ public final class ShardTransactionMessages { bitField0_ = (bitField0_ & ~0x00000001); transactionId_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + messageVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -1854,6 +1895,10 @@ public final class ShardTransactionMessages { to_bitField0_ |= 0x00000002; } result.transactionId_ = transactionId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.messageVersion_ = messageVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1880,6 +1925,9 @@ public final class ShardTransactionMessages { transactionId_ = other.transactionId_; onChanged(); } + if (other.hasMessageVersion()) { + setMessageVersion(other.getMessageVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2063,6 +2111,39 @@ public final class ShardTransactionMessages { return this; } + // optional int32 messageVersion = 3; + private int messageVersion_ ; + /** + * optional int32 messageVersion = 3; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 messageVersion = 3; + */ + public int getMessageVersion() { + return messageVersion_; + } + /** + * optional int32 messageVersion = 3; + */ + public Builder setMessageVersion(int value) { + bitField0_ |= 0x00000004; + messageVersion_ = value; + onChanged(); + return this; + } + /** + * optional int32 messageVersion = 3; + */ + public Builder clearMessageVersion() { + bitField0_ = (bitField0_ & ~0x00000004); + messageVersion_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply) } @@ -7838,33 +7919,34 @@ public final class ShardTransactionMessages { "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" + "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" + "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" + - "M\n\026CreateTransactionReply\022\034\n\024transaction" + - "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" + - "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" + - "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037", - "instanceIdentifierPathArguments\030\001 \002(\01325." + - "org.opendaylight.controller.mdsal.Instan" + - "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" + - "ata\022^\n\037instanceIdentifierPathArguments\030\001" + - " \002(\01325.org.opendaylight.controller.mdsal" + - ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" + - "normalizedNode\030\001 \001(\0132\'.org.opendaylight." + - "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" + - "nstanceIdentifierPathArguments\030\001 \002(\01325.o" + - "rg.opendaylight.controller.mdsal.Instanc", - "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" + - "g.opendaylight.controller.mdsal.Node\"\020\n\016" + - "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" + - "IdentifierPathArguments\030\001 \002(\01325.org.open" + - "daylight.controller.mdsal.InstanceIdenti" + - "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" + - "aylight.controller.mdsal.Node\"\020\n\016MergeDa" + - "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" + - "ierPathArguments\030\001 \002(\01325.org.opendayligh" + - "t.controller.mdsal.InstanceIdentifier\"!\n", - "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." + - "opendaylight.controller.protobuff.messag" + - "es.transactionB\030ShardTransactionMessages" + "e\n\026CreateTransactionReply\022\034\n\024transaction" + + "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" + + "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" + + "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath", + "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" + + "erPathArguments\030\001 \002(\01325.org.opendaylight" + + ".controller.mdsal.InstanceIdentifier\"\021\n\017" + + "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" + + "dentifierPathArguments\030\001 \002(\01325.org.opend" + + "aylight.controller.mdsal.InstanceIdentif" + + "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" + + " \001(\0132\'.org.opendaylight.controller.mdsal" + + ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" + + "rPathArguments\030\001 \002(\01325.org.opendaylight.", + "controller.mdsal.InstanceIdentifier\022?\n\016n" + + "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" + + "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" + + "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" + + "guments\030\001 \002(\01325.org.opendaylight.control" + + "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" + + "edNode\030\002 \002(\0132\'.org.opendaylight.controll" + + "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" + + "xists\022^\n\037instanceIdentifierPathArguments" + + "\030\001 \002(\01325.org.opendaylight.controller.mds", + "al.InstanceIdentifier\"!\n\017DataExistsReply" + + "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" + + "troller.protobuff.messages.transactionB\030" + + "ShardTransactionMessages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7894,7 +7976,7 @@ public final class ShardTransactionMessages { internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor, - new java.lang.String[] { "TransactionActorPath", "TransactionId", }); + new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", }); internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto index cd1132d99d..c5e4ee45c0 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto @@ -20,9 +20,9 @@ message CreateTransaction{ } message CreateTransactionReply{ -required string transactionActorPath = 1; -required string transactionId = 2; - + required string transactionActorPath = 1; + required string transactionId = 2; + optional int32 messageVersion = 3; } message ReadyTransaction{ @@ -30,7 +30,7 @@ message ReadyTransaction{ } message ReadyTransactionReply{ -required string actorPath = 1; + required string actorPath = 1; } message DeleteData { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1bf32e7fca..5ea9b30c63 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -8,13 +8,25 @@ package org.opendaylight.controller.cluster.datastore; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; @@ -65,25 +77,14 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Creator; -import akka.persistence.RecoveryFailure; -import akka.serialization.Serialization; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; + +import javax.annotation.Nonnull; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * A Shard represents a portion of the logical data tree
@@ -93,8 +94,6 @@ import com.google.protobuf.InvalidProtocolBufferException; */ public class Shard extends RaftActor { - private static final int HELIUM_1_TX_VERSION = 1; - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @@ -391,7 +390,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) { + if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -536,7 +535,7 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { createTransaction(createTransaction.getTransactionType(), createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), - createTransaction.getClientVersion()); + createTransaction.getVersion()); } private ActorRef createTransaction(int transactionType, String remoteTransactionId, @@ -615,7 +614,7 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration subject = TestActorRef .create(getSystem(), props, @@ -107,7 +107,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -137,7 +137,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -167,7 +167,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -200,7 +200,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -238,7 +238,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -271,7 +271,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index cc3bb55e85..c869be82d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -80,13 +80,13 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); } @@ -118,14 +118,14 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRO")); props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRW")); @@ -156,13 +156,13 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); } @@ -191,13 +191,13 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); } @@ -238,7 +238,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, @@ -265,7 +265,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, @@ -291,7 +291,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); @@ -314,7 +314,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); watch(transaction); @@ -332,7 +332,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); watch(transaction); @@ -354,7 +354,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); watch(transaction); @@ -371,7 +371,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); @@ -387,7 +387,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testShardTransactionInactivity"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 35f346f0d6..b77b0b65cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -338,13 +338,15 @@ public class TransactionProxyTest { return getSystem().actorSelection(actorRef.path()); } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ return CreateTransactionReply.newBuilder() .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1").build(); + .setTransactionId("txn-1") + .setMessageVersion(transactionVersion) + .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -352,7 +354,7 @@ public class TransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext). + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); @@ -361,6 +363,11 @@ public class TransactionProxyTest { return actorRef; } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION); + } + + private void propagateReadFailedExceptionCause(CheckedFuture future) throws Throwable { @@ -835,6 +842,47 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } + @SuppressWarnings("unchecked") + @Test + public void testReadyForwardCompatibility() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + @SuppressWarnings("unchecked") @Test public void testReadyWithRecordingOperationFailure() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 60f9a2d9dc..39d337e91b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; @@ -17,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; + import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -182,4 +185,51 @@ public class ActorContextTest extends AbstractActorTest{ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/"); assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); } + + @Test + public void testResolvePathForRemoteActor() { + ActorContext actorContext = + new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock( + ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForLocalActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka://system/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka://system/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForRemoteActorWithProperRemoteAddress() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + }