Implemented as outlined in Bug 2294.
Change-Id: I14aa8ef5f320f9d165492396ece4ea63cce9b0c3
Signed-off-by: tpantelis <tpanteli@brocade.com>
public interface CanCommitTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
public interface AbortTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
public interface CommitTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
java.lang.String[] descriptorData = {
"\n\014Cohort.proto\022!org.opendaylight.control" +
"ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
- "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+ "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" +
"Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
- "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+ "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" +
"sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
- "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+ "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" +
"ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
"ransactionReplyBZ\n8org.opendaylight.cont" +
"roller.protobuff.messages.cohort3pcB\036Thr",
*/
com.google.protobuf.ByteString
getTransactionChainIdBytes();
+
+ // optional int32 messageVersion = 4;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ boolean hasMessageVersion();
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ int getMessageVersion();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
transactionChainId_ = input.readBytes();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ messageVersion_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
}
}
+ // optional int32 messageVersion = 4;
+ public static final int MESSAGEVERSION_FIELD_NUMBER = 4;
+ private int messageVersion_;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+
private void initFields() {
transactionId_ = "";
transactionType_ = 0;
transactionChainId_ = "";
+ messageVersion_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, getTransactionChainIdBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt32(4, messageVersion_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, getTransactionChainIdBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(4, messageVersion_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000002);
transactionChainId_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
+ messageVersion_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
to_bitField0_ |= 0x00000004;
}
result.transactionChainId_ = transactionChainId_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.messageVersion_ = messageVersion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
transactionChainId_ = other.transactionChainId_;
onChanged();
}
+ if (other.hasMessageVersion()) {
+ setMessageVersion(other.getMessageVersion());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 messageVersion = 4;
+ private int messageVersion_ ;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public Builder setMessageVersion(int value) {
+ bitField0_ |= 0x00000008;
+ messageVersion_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public Builder clearMessageVersion() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ messageVersion_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
}
java.lang.String[] descriptorData = {
"\n\026ShardTransaction.proto\022!org.opendaylig" +
"ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
- "seTransaction\"\027\n\025CloseTransactionReply\"_" +
+ "seTransaction\"\027\n\025CloseTransactionReply\"w" +
"\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
"(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
- "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
- "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
- "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
- "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
- "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
- "guments\030\001 \002(\01325.org.opendaylight.control" +
- "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
- "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
- "rPathArguments\030\001 \002(\01325.org.opendaylight." +
- "controller.mdsal.InstanceIdentifier\"P\n\rR" +
- "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
- "rg.opendaylight.controller.mdsal.Node\"\254\001" +
- "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
- "uments\030\001 \002(\01325.org.opendaylight.controll" +
- "er.mdsal.InstanceIdentifier\022?\n\016normalize",
- "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
- "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
- "Data\022^\n\037instanceIdentifierPathArguments\030" +
- "\001 \002(\01325.org.opendaylight.controller.mdsa" +
- "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
- " \002(\0132\'.org.opendaylight.controller.mdsal" +
- ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
- "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
- ".org.opendaylight.controller.mdsal.Insta" +
- "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
- "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
- "protobuff.messages.transactionB\030ShardTra" +
- "nsactionMessages"
+ "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
+ "M\n\026CreateTransactionReply\022\034\n\024transaction" +
+ "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" +
+ "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" +
+ "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037",
+ "instanceIdentifierPathArguments\030\001 \002(\01325." +
+ "org.opendaylight.controller.mdsal.Instan" +
+ "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" +
+ "ata\022^\n\037instanceIdentifierPathArguments\030\001" +
+ " \002(\01325.org.opendaylight.controller.mdsal" +
+ ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" +
+ "normalizedNode\030\001 \001(\0132\'.org.opendaylight." +
+ "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" +
+ "nstanceIdentifierPathArguments\030\001 \002(\01325.o" +
+ "rg.opendaylight.controller.mdsal.Instanc",
+ "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" +
+ "g.opendaylight.controller.mdsal.Node\"\020\n\016" +
+ "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" +
+ "IdentifierPathArguments\030\001 \002(\01325.org.open" +
+ "daylight.controller.mdsal.InstanceIdenti" +
+ "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" +
+ "aylight.controller.mdsal.Node\"\020\n\016MergeDa" +
+ "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" +
+ "ierPathArguments\030\001 \002(\01325.org.opendayligh" +
+ "t.controller.mdsal.InstanceIdentifier\"!\n",
+ "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." +
+ "opendaylight.controller.protobuff.messag" +
+ "es.transactionB\030ShardTransactionMessages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
- new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
+ new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", });
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
message CanCommitTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message CanCommitTransactionReply{
}
message AbortTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message AbortTransactionReply {
}
message CommitTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message CommitTransactionReply{
required string transactionId = 1;
required int32 transactionType =2;
optional string transactionChainId = 3;
+ optional int32 messageVersion = 4;
}
message CreateTransactionReply{
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-
-import javax.annotation.Nonnull;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
* A Shard represents a portion of the logical data tree <br/>
*/
public class Shard extends RaftActor {
+ private static final int HELIUM_1_TX_VERSION = 1;
+
private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}", ready.getTransactionID());
+ LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+ ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
ready.getModification());
- // Return our actor path as we'll handle the three phase commit.
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(self()));
- getSender().tell(
- ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
- getSelf());
+ // Return our actor path as we'll handle the three phase commit, except if the Tx client
+ // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+ // node. In that case, the subsequent 3-phase commit messages won't contain the
+ // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+ // to provide the compatible behavior.
+ ActorRef replyActorPath = self();
+ if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+ LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
+
+ ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+ Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
}
private void handleAbortTransaction(AbortTransaction abort) {
}
}
- private ActorRef createTypedTransactionActor(
- int transactionType,
- ShardTransactionIdentifier transactionId,
- String transactionChainId ) {
+ private ActorRef createTypedTransactionActor(int transactionType,
+ ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
DOMStoreTransactionFactory factory = store;
return getContext().actorOf(
ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
schemaContext,datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private void createTransaction(CreateTransaction createTransaction) {
createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+ createTransaction.getClientVersion());
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+ String transactionChainId, int clientVersion) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
if(LOG.isDebugEnabled()) {
LOG.debug("Creating transaction : {} ", transactionId);
}
- ActorRef transactionActor =
- createTypedTransactionActor(transactionType, transactionId, transactionChainId);
+ ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+ transactionChainId, clientVersion);
getSender()
.tell(new CreateTransactionReply(
// so that this actor does not get block building the snapshot
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "");
+ "createSnapshot" + ++createSnapshotTransactionCounter, "",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
*/
public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
+ protected static final boolean SERIALIZED_REPLY = true;
+
private final ActorRef shardActor;
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
- protected static final boolean SERIALIZED_REPLY = true;
+ private final int txnClientVersion;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, int txnClientVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
+ this.txnClientVersion = txnClientVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID) {
+ String transactionID, int txnClientVersion) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
- datastoreContext, shardStats, transactionID));
+ datastoreContext, shardStats, transactionID, txnClientVersion));
}
protected abstract DOMStoreTransaction getDOMStoreTransaction();
return schemaContext;
}
+ protected int getTxnClientVersion() {
+ return txnClientVersion;
+ }
+
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
+ final int txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, int txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
+ this.txnClientVersion = txnClientVersion;
}
@Override
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardStats, transactionID);
+ schemaContext, shardStats, transactionID, txnClientVersion);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getClientVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getClientVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getClientVersion()), transactionName);
} else {
throw new IllegalArgumentException (
"CreateTransaction message has unidentified transaction type=" +
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
- returnSerialized), getContext());
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+ cohort, modification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+
+/**
+ * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
+ * messages don't contain the transactionId. This actor just forwards a new message containing the
+ * transactionId to the parent Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
+
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+
+ private final String transactionId;
+
+ private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
+
+ getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
+ getContext());
+ } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
+
+ // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
+ getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
+ } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
+
+ getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
+ getContext());
+
+ // We're done now - we can self-destruct
+ self().tell(PoisonPill.getInstance(), self());
+ } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
+
+ getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
+ getContext());
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ }
+
+ public static Props props(String transactionId) {
+ return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
+ }
+
+ private static class BackwardsCompatibleThreePhaseCommitCohortCreator
+ implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
+ private static final long serialVersionUID = 1L;
+
+ private final String transactionId;
+
+ BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
+ return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
+ }
+ }
+}
public class CreateTransaction implements SerializableMessage {
- public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+ public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CreateTransaction.class;
+
+ public static final int CURRENT_CLIENT_VERSION = 1;
+
private final String transactionId;
private final int transactionType;
private final String transactionChainId;
+ private final int clientVersion;
public CreateTransaction(String transactionId, int transactionType) {
this(transactionId, transactionType, "");
}
public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+ this(transactionId, transactionType, transactionChainId, CURRENT_CLIENT_VERSION);
+ }
+ private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+ int clientVersion) {
this.transactionId = transactionId;
this.transactionType = transactionType;
this.transactionChainId = transactionChainId;
-
+ this.clientVersion = clientVersion;
}
-
public String getTransactionId() {
return transactionId;
}
return transactionType;
}
+ public int getClientVersion() {
+ return clientVersion;
+ }
+
@Override
public Object toSerializable() {
return ShardTransactionMessages.CreateTransaction.newBuilder()
.setTransactionId(transactionId)
.setTransactionType(transactionType)
- .setTransactionChainId(transactionChainId).build();
+ .setTransactionChainId(transactionChainId)
+ .setMessageVersion(clientVersion).build();
}
public static CreateTransaction fromSerializable(Object message) {
ShardTransactionMessages.CreateTransaction createTransaction =
(ShardTransactionMessages.CreateTransaction) message;
return new CreateTransaction(createTransaction.getTransactionId(),
- createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+ createTransaction.getMessageVersion());
}
public String getTransactionChainId() {
private final DOMStoreThreePhaseCommitCohort cohort;
private final Modification modification;
private final boolean returnSerialized;
+ private final int txnClientVersion;
- public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification, boolean returnSerialized) {
+ public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+ DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+ boolean returnSerialized) {
this.transactionID = transactionID;
this.cohort = cohort;
this.modification = modification;
this.returnSerialized = returnSerialized;
-
+ this.txnClientVersion = txnClientVersion;
}
public String getTransactionID() {
public boolean isReturnSerialized() {
return returnSerialized;
}
+
+ public int getTxnClientVersion() {
+ return txnClientVersion;
+ }
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.japi.Creator;
-import akka.pattern.Patterns;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
public class ShardTest extends AbstractActorTest {
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
};
}
- private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+ static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.pattern.AskTimeoutException;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Covers negative test cases
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collections;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+
+/**
+ * Tests backwards compatibility support from Helium-1 to Helium.
+ *
+ * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
+ * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
+ * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
+ * would be sans transactionId so this test verifies the Shard handles that properly.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+
+ @Test
+ public void testTransactionCommit() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ SchemaContext schemaContext = TestModel.createTestContext();
+ Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+ shardName("inventory").type("config").build(),
+ Collections.<ShardIdentifier,String>emptyMap(),
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+ schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+ "testTransactionCommit");
+
+ waitUntilLeader(shard);
+
+ // Send CreateTransaction message with no messages version
+
+ String transactionID = "txn-1";
+ shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionID)
+ .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionChainId("").build(), getRef());
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+ ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+ // Write data to the Tx
+
+ txActor.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+ expectMsgClass(duration, WriteDataReply.class);
+
+ // Ready the Tx
+
+ txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+ duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+ ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+ // Send the CanCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the PreCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
+ Assert.assertNotNull("Data not found in store", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testTransactionAbort() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ SchemaContext schemaContext = TestModel.createTestContext();
+ Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+ shardName("inventory").type("config").build(),
+ Collections.<ShardIdentifier,String>emptyMap(),
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+ schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+ "testTransactionAbort");
+
+ waitUntilLeader(shard);
+
+ // Send CreateTransaction message with no messages version
+
+ String transactionID = "txn-1";
+ shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionID)
+ .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionChainId("").build(), getRef());
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+ ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+ // Write data to the Tx
+
+ txActor.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+ expectMsgClass(duration, WriteDataReply.class);
+
+ // Ready the Tx
+
+ txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+ duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+ ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+ // Send the CanCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the AbortTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
public class ShardTransactionTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor =
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
watch(transaction);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
watch(transaction);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
watch(transaction);
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_CLIENT_VERSION);
final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");