In Helium when the ShardTransaction processes a Ready message it sends back a the path of a ThreePhaseCommitCohort actor in the ReadyReply. This path is actually a local actor path, this local actor path is then converted into a remote path by the TransactionProxy.
The fix for Bug 1607 breaks this capability which is required to support forward compatibility in a cluster where a transaction request originates in a node that has been upgraded to Helium-1 and the actual transaction is happening on a node which has not yet been upgraded to Helium-1.
Change-Id: I857384bdd61b3492ea270dcf04d14883811c37c2
Signed-off-by: Moiz Raja <moraja@cisco.com>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
+
+ // optional int32 messageVersion = 3;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ boolean hasMessageVersion();
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ int getMessageVersion();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply}
transactionId_ = input.readBytes();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ messageVersion_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
}
}
+ // optional int32 messageVersion = 3;
+ public static final int MESSAGEVERSION_FIELD_NUMBER = 3;
+ private int messageVersion_;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+
private void initFields() {
transactionActorPath_ = "";
transactionId_ = "";
+ messageVersion_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getTransactionIdBytes());
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt32(3, messageVersion_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getTransactionIdBytes());
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, messageVersion_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000001);
transactionId_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
+ messageVersion_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
to_bitField0_ |= 0x00000002;
}
result.transactionId_ = transactionId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.messageVersion_ = messageVersion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
transactionId_ = other.transactionId_;
onChanged();
}
+ if (other.hasMessageVersion()) {
+ setMessageVersion(other.getMessageVersion());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 messageVersion = 3;
+ private int messageVersion_ ;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public Builder setMessageVersion(int value) {
+ bitField0_ |= 0x00000004;
+ messageVersion_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public Builder clearMessageVersion() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ messageVersion_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply)
}
"\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
"(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
"onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
- "M\n\026CreateTransactionReply\022\034\n\024transaction" +
- "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" +
- "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" +
- "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037",
- "instanceIdentifierPathArguments\030\001 \002(\01325." +
- "org.opendaylight.controller.mdsal.Instan" +
- "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" +
- "ata\022^\n\037instanceIdentifierPathArguments\030\001" +
- " \002(\01325.org.opendaylight.controller.mdsal" +
- ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" +
- "normalizedNode\030\001 \001(\0132\'.org.opendaylight." +
- "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" +
- "nstanceIdentifierPathArguments\030\001 \002(\01325.o" +
- "rg.opendaylight.controller.mdsal.Instanc",
- "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" +
- "g.opendaylight.controller.mdsal.Node\"\020\n\016" +
- "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" +
- "IdentifierPathArguments\030\001 \002(\01325.org.open" +
- "daylight.controller.mdsal.InstanceIdenti" +
- "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" +
- "aylight.controller.mdsal.Node\"\020\n\016MergeDa" +
- "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" +
- "ierPathArguments\030\001 \002(\01325.org.opendayligh" +
- "t.controller.mdsal.InstanceIdentifier\"!\n",
- "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." +
- "opendaylight.controller.protobuff.messag" +
- "es.transactionB\030ShardTransactionMessages"
+ "e\n\026CreateTransactionReply\022\034\n\024transaction" +
+ "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" +
+ "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" +
+ "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath",
+ "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" +
+ "erPathArguments\030\001 \002(\01325.org.opendaylight" +
+ ".controller.mdsal.InstanceIdentifier\"\021\n\017" +
+ "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" +
+ "dentifierPathArguments\030\001 \002(\01325.org.opend" +
+ "aylight.controller.mdsal.InstanceIdentif" +
+ "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" +
+ " \001(\0132\'.org.opendaylight.controller.mdsal" +
+ ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" +
+ "rPathArguments\030\001 \002(\01325.org.opendaylight.",
+ "controller.mdsal.InstanceIdentifier\022?\n\016n" +
+ "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" +
+ "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" +
+ "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" +
+ "guments\030\001 \002(\01325.org.opendaylight.control" +
+ "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" +
+ "edNode\030\002 \002(\0132\'.org.opendaylight.controll" +
+ "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" +
+ "xists\022^\n\037instanceIdentifierPathArguments" +
+ "\030\001 \002(\01325.org.opendaylight.controller.mds",
+ "al.InstanceIdentifier\"!\n\017DataExistsReply" +
+ "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" +
+ "troller.protobuff.messages.transactionB\030" +
+ "ShardTransactionMessages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor,
- new java.lang.String[] { "TransactionActorPath", "TransactionId", });
+ new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", });
internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new
}
message CreateTransactionReply{
-required string transactionActorPath = 1;
-required string transactionId = 2;
-
+ required string transactionActorPath = 1;
+ required string transactionId = 2;
+ optional int32 messageVersion = 3;
}
message ReadyTransaction{
}
message ReadyTransactionReply{
-required string actorPath = 1;
+ required string actorPath = 1;
}
message DeleteData {
package org.opendaylight.controller.cluster.datastore;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* A Shard represents a portion of the logical data tree <br/>
*/
public class Shard extends RaftActor {
- private static final int HELIUM_1_TX_VERSION = 1;
-
private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+ if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
private void createTransaction(CreateTransaction createTransaction) {
createTransaction(createTransaction.getTransactionType(),
createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
- createTransaction.getClientVersion());
+ createTransaction.getVersion());
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
"createSnapshot" + ++createSnapshotTransactionCounter, "",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
createTransaction.getTransactionId(),
- createTransaction.getClientVersion()), transactionName);
+ createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
createTransaction.getTransactionId(),
- createTransaction.getClientVersion()), transactionName);
+ createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
createTransaction.getTransactionId(),
- createTransaction.getClientVersion()), transactionName);
+ createTransaction.getVersion()), transactionName);
} else {
throw new IllegalArgumentException (
"CreateTransaction message has unidentified transaction type=" +
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal);
+ transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
private final ActorContext actorContext;
private final SchemaContext schemaContext;
+ private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
+ private final int remoteTransactionVersion;
- private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal) {
+ boolean isTxActorLocal, int remoteTransactionVersion) {
super(identifier);
+ this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
+ this.remoteTransactionVersion = remoteTransactionVersion;
}
private ActorSelection getActor() {
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- return actorContext.actorSelection(reply.getCohortPath());
+ String cohortPath = reply.getCohortPath();
+
+ // In Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+ }
+
+ return actorContext.actorSelection(cohortPath);
} else {
// Throwing an exception here will fail the Future.
public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
ShardTransactionMessages.CreateTransaction.class;
- public static final int CURRENT_CLIENT_VERSION = 1;
+ public static final int HELIUM_1_VERSION = 1;
+ public static final int CURRENT_VERSION = HELIUM_1_VERSION;
private final String transactionId;
private final int transactionType;
private final String transactionChainId;
- private final int clientVersion;
+ private final int version;
public CreateTransaction(String transactionId, int transactionType) {
this(transactionId, transactionType, "");
}
public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
- this(transactionId, transactionType, transactionChainId, CURRENT_CLIENT_VERSION);
+ this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
}
private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
- int clientVersion) {
+ int version) {
this.transactionId = transactionId;
this.transactionType = transactionType;
this.transactionChainId = transactionChainId;
- this.clientVersion = clientVersion;
+ this.version = version;
}
public String getTransactionId() {
return transactionType;
}
- public int getClientVersion() {
- return clientVersion;
+ public int getVersion() {
+ return version;
}
@Override
.setTransactionId(transactionId)
.setTransactionType(transactionType)
.setTransactionChainId(transactionChainId)
- .setMessageVersion(clientVersion).build();
+ .setMessageVersion(version).build();
}
public static CreateTransaction fromSerializable(Object message) {
public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
private final String transactionPath;
private final String transactionId;
+ private final int version;
public CreateTransactionReply(String transactionPath,
String transactionId) {
+ this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+ }
+
+ public CreateTransactionReply(String transactionPath,
+ String transactionId, int version) {
this.transactionPath = transactionPath;
this.transactionId = transactionId;
+ this.version = version;
}
+
public String getTransactionPath() {
return transactionPath;
}
return transactionId;
}
+ public int getVersion() {
+ return version;
+ }
+
public Object toSerializable(){
return ShardTransactionMessages.CreateTransactionReply.newBuilder()
.setTransactionActorPath(transactionPath)
.setTransactionId(transactionId)
+ .setMessageVersion(version)
.build();
}
public static CreateTransactionReply fromSerializable(Object serializable){
ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
- return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId());
+ return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
}
}
private final String cohortPath;
public ReadyTransactionReply(String cohortPath) {
-
this.cohortPath = cohortPath;
}
@Override
public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
- return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
- setActorPath(cohortPath).build();
+ return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+ .setActorPath(cohortPath)
+ .build();
}
public static ReadyTransactionReply fromSerializable(Object serializable) {
return hostPort1.equals(hostPort2);
}
+
+ /**
+ * @deprecated This method is present only to support backward compatibility with Helium and should not be
+ * used any further
+ *
+ *
+ * @param primaryPath
+ * @param localPathOfRemoteActor
+ * @return
+ */
+ @Deprecated
+ public String resolvePath(final String primaryPath,
+ final String localPathOfRemoteActor) {
+ StringBuilder builder = new StringBuilder();
+ String[] primaryPathElements = primaryPath.split("/");
+ builder.append(primaryPathElements[0]).append("//")
+ .append(primaryPathElements[1]).append(primaryPathElements[2]);
+ String[] remotePathElements = localPathOfRemoteActor.split("/");
+ for (int i = 3; i < remotePathElements.length; i++) {
+ builder.append("/").append(remotePathElements[i]);
+ }
+
+ return builder.toString();
+ }
}
package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.japi.Creator;
-import akka.pattern.Patterns;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
public class ShardTest extends AbstractActorTest {
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_CLIENT_VERSION);
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
return getSystem().actorSelection(actorRef.path());
}
- private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1").build();
+ .setTransactionId("txn-1")
+ .setMessageVersion(transactionVersion)
+ .build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+ doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
return actorRef;
}
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ }
+
+
private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
throws Throwable {
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyForwardCompatibility() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+
import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
}
+
+ @Test
+ public void testResolvePathForRemoteActor() {
+ ActorContext actorContext =
+ new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+ ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
+
+ String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testResolvePathForLocalActor() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka://system/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
+
+ String expected = "akka://system/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+ String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
}