Deprecate CreateTransaction protobuff message 42/33142/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 20 Jan 2016 07:39:53 +0000 (02:39 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 27 Jan 2016 19:35:22 +0000 (19:35 +0000)
Deprecated the associated CreateTransaction and CreateTransactionReply
protobuff messages and changed the message classes to extend
VersionedExternalizableMessage. Backwards compatibility with
pre-boron is maintained. Related code was modified accordingly.

One thing of note is wrt CreateTransactionReply. Previously it had a
version field that represented the leader shard's version. This is used
by the transaction front-end to send the appropriate version for
subsequent messages. However the front-end
RemoteTransactionContextSupport already knows the leader shard's version
from the PrimaryShardInfo which is used for write-only tx's and now the
CreateTransaction message so, to be consistent, it now always uses the
PrimaryShardInfo's version when creating the context instance.

Also I realized that the message versioning would correctly handle
backwards compatibility, ie a newer version sending to an older version,
but not the other way around. So for a newer version, 2, sending to an
older version, 1, the message version would be 1 and would be serialized
in the older format and would be correctly de-serialized on the other
end. However, for an older version, 1, sending to a newer version, 2,
the message version would be 2 but it would be serialized in the older
format and, on the other end, the recipient would see the version as 2
and think it's the newer/current version and it may not de-serialize
correctly. What we really want is to use the lower of the recipient's
version and the sender's version. So I modified the
VersionedExternalizableMessage ctor to do that.

I had to make a change to ShardTransactionChain but then realized it's
no longer used so I just removed it.

Change-Id: I18051c48ec4a8f4251e7acef1e1c24063e53ef24
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionTest.java [new file with mode: 0644]

index 4fda059f3182ee898f7f4bc076ae7800159b3a77..d8dda25cc78d524d0ed7241500dbff50de9f669b 100644 (file)
@@ -82,7 +82,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
             } else {
                 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
                         parent, shardName);
-                remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
+                remote.setPrimaryShard(primaryShardInfo);
             }
         } finally {
             onTransactionContextCreated(parent.getIdentifier());
index 4ce0767d1df7906e2b6bbeb82e787b2ac6e17772..8f5aade018aa936eb9b3a42fcc4e9a4539961afb 100644 (file)
@@ -48,7 +48,7 @@ final class RemoteTransactionContextSupport {
     /**
      * The target primary shard.
      */
-    private volatile ActorSelection primaryShard;
+    private volatile PrimaryShardInfo primaryShardInfo;
 
     /**
      * The total timeout for creating a tx on the primary shard.
@@ -97,18 +97,20 @@ final class RemoteTransactionContextSupport {
     /**
      * Sets the target primary shard and initiates a CreateTransaction try.
      */
-    void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
-        this.primaryShard = primaryShard;
+    void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
+        this.primaryShardInfo = primaryShardInfo;
 
         if (getTransactionType() == TransactionType.WRITE_ONLY  &&
                 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+            ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();
+
             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
                 getIdentifier(), primaryShard);
 
             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
             // to avoid the overhead of creating a separate transaction actor.
-            transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
-                    this.primaryShard.path().toString(), primaryVersion));
+            transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
+                    primaryShard, primaryShard.path().toString(), primaryShardInfo.getPrimaryShardVersion()));
         } else {
             tryCreateTransaction();
         }
@@ -119,14 +121,16 @@ final class RemoteTransactionContextSupport {
      */
     private void tryCreateTransaction() {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
+            LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
+                    primaryShardInfo.getPrimaryShardActor());
         }
 
         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
-            getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
+                getTransactionType().ordinal(), getIdentifier().getChainId(),
+                    primaryShardInfo.getPrimaryShardVersion()).toSerializable();
 
-        Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard,
-                serializedCreateMessage, createTxMessageTimeout);
+        Future<Object> createTxFuture = getActorContext().executeOperationAsync(
+                primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout);
 
         createTxFuture.onComplete(new OnComplete<Object>() {
             @Override
@@ -139,7 +143,7 @@ final class RemoteTransactionContextSupport {
     private void tryFindPrimaryShard() {
         LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
 
-        this.primaryShard = null;
+        this.primaryShardInfo = null;
         Future<PrimaryShardInfo> findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
         findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
             @Override
@@ -151,7 +155,7 @@ final class RemoteTransactionContextSupport {
 
     private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
         if (failure == null) {
-            this.primaryShard = primaryShardInfo.getPrimaryShardActor();
+            this.primaryShardInfo = primaryShardInfo;
             tryCreateTransaction();
         } else {
             LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
@@ -163,7 +167,7 @@ final class RemoteTransactionContextSupport {
     private void onCreateTransactionComplete(Throwable failure, Object response) {
         // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
         // the cached remote leader actor is no longer available.
-        boolean retryCreateTransaction = this.primaryShard != null &&
+        boolean retryCreateTransaction = primaryShardInfo != null &&
                 (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
         if(retryCreateTransaction) {
             // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
@@ -223,7 +227,7 @@ final class RemoteTransactionContextSupport {
             }
 
             localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
-        } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
+        } else if (CreateTransactionReply.isSerializedType(response)) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
         } else {
@@ -240,7 +244,7 @@ final class RemoteTransactionContextSupport {
         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
 
         return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
-                reply.getTransactionPath(), reply.getVersion());
+                reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion());
     }
 
     private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
index 9ea22fc1967f117ec8284ffbaee140da27c394d9..3cab853f9adeaa069de9e9ce852ae94d32a94666 100644 (file)
@@ -226,7 +226,7 @@ public class Shard extends RaftActor {
         }
 
         try {
-            if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+            if (CreateTransaction.isSerializedType(message)) {
                 handleCreateTransaction(message);
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
@@ -551,7 +551,7 @@ public class Shard extends RaftActor {
                 createTransaction.getVersion());
 
             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
-                    createTransaction.getTransactionId()).toSerializable(), getSelf());
+                    createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
         } catch (Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
deleted file mode 100644 (file)
index aedc6c4..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.Preconditions;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.japi.Creator;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-/**
- * The ShardTransactionChain Actor represents a remote TransactionChain
- */
-public class ShardTransactionChain extends AbstractUntypedActor {
-
-    private final ShardDataTreeTransactionChain chain;
-    private final DatastoreContext datastoreContext;
-    private final ShardStats shardStats;
-
-    public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
-            ShardStats shardStats) {
-        this.chain = Preconditions.checkNotNull(chain);
-        this.datastoreContext = datastoreContext;
-        this.shardStats = shardStats;
-    }
-
-    @Override
-    public void handleReceive(Object message) throws Exception {
-        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
-            CreateTransaction createTransaction = CreateTransaction.fromSerializable( message);
-            createTransaction(createTransaction);
-        } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
-            chain.close();
-            getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
-        }else{
-            unknownMessage(message);
-        }
-    }
-
-    private ActorRef getShardActor(){
-        return getContext().parent();
-    }
-
-    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
-        String transactionName = "shard-" + createTransaction.getTransactionId();
-
-        final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType());
-        final AbstractShardDataTreeTransaction<?> transaction;
-        switch (type) {
-        case READ_ONLY:
-            transaction = chain.newReadOnlyTransaction(transactionName);
-            break;
-        case READ_WRITE:
-        case WRITE_ONLY:
-            transaction = chain.newReadWriteTransaction(transactionName);
-            break;
-        default:
-            throw new IllegalArgumentException("Unhandled transaction type " + type);
-        }
-
-        return getContext().actorOf(
-            ShardTransaction.props(type, transaction, getShardActor(),
-                    datastoreContext, shardStats, createTransaction.getTransactionId(),
-                    createTransaction.getVersion()), transactionName);
-    }
-
-    private void createTransaction(CreateTransaction createTransaction) {
-
-        ActorRef transactionActor = createTypedTransactionActor(createTransaction);
-        getSender().tell(new CreateTransactionReply(transactionActor.path().toString(),
-                createTransaction.getTransactionId()).toSerializable(), getSelf());
-    }
-
-    public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext,
-        DatastoreContext datastoreContext, ShardStats shardStats) {
-        return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
-    }
-
-    private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
-        private static final long serialVersionUID = 1L;
-
-        final ShardDataTreeTransactionChain chain;
-        final DatastoreContext datastoreContext;
-        final ShardStats shardStats;
-
-        ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
-                ShardStats shardStats) {
-            this.chain = chain;
-            this.datastoreContext = datastoreContext;
-            this.shardStats = shardStats;
-        }
-
-        @Override
-        public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, datastoreContext, shardStats);
-        }
-    }
-}
index 21f16e7bfeef07bf1c64ec97baa33b683420a216..036cb586144a92e683912292007b4f4a0b844cca 100644 (file)
@@ -8,35 +8,29 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+public class CreateTransaction extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
-public class CreateTransaction implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.CreateTransaction.class;
-
-    private final String transactionId;
-    private final int transactionType;
-    private final String transactionChainId;
-    private final short version;
+    private String transactionId;
+    private int transactionType;
+    private String transactionChainId;
 
-    public CreateTransaction(String transactionId, int transactionType) {
-        this(transactionId, transactionType, "");
+    public CreateTransaction() {
     }
 
-    public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
-        this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+    public CreateTransaction(String transactionId, int transactionType, String transactionChainId,
             short version) {
+        super(version);
         this.transactionId = Preconditions.checkNotNull(transactionId);
         this.transactionType = transactionType;
-        this.transactionChainId = transactionChainId;
-        this.version = version;
+        this.transactionChainId = transactionChainId != null ? transactionChainId : "";
     }
 
     public String getTransactionId() {
@@ -47,28 +41,56 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
-    public short getVersion() {
-        return version;
+    public String getTransactionChainId() {
+        return transactionChainId;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        transactionId = in.readUTF();
+        transactionType = in.readInt();
+        transactionChainId = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(transactionId);
+        out.writeInt(transactionType);
+        out.writeUTF(transactionChainId);
     }
 
     @Override
     public Object toSerializable() {
-        return ShardTransactionMessages.CreateTransaction.newBuilder()
-            .setTransactionId(transactionId)
-            .setTransactionType(transactionType)
-            .setTransactionChainId(transactionChainId)
-            .setMessageVersion(version).build();
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.CreateTransaction.newBuilder()
+                .setTransactionId(transactionId).setTransactionType(transactionType)
+                .setTransactionChainId(transactionChainId).setMessageVersion(getVersion()).build();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "CreateTransaction [transactionId=" + transactionId + ", transactionType=" + transactionType
+                + ", transactionChainId=" + transactionChainId + "]";
     }
 
     public static CreateTransaction fromSerializable(Object message) {
-        ShardTransactionMessages.CreateTransaction createTransaction =
-            (ShardTransactionMessages.CreateTransaction) message;
-        return new CreateTransaction(createTransaction.getTransactionId(),
-            createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
-            (short)createTransaction.getMessageVersion());
+        if(message instanceof CreateTransaction) {
+            return (CreateTransaction)message;
+        } else {
+            ShardTransactionMessages.CreateTransaction createTransaction =
+                    (ShardTransactionMessages.CreateTransaction) message;
+            return new CreateTransaction(createTransaction.getTransactionId(),
+                    createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+                    (short)createTransaction.getMessageVersion());
+        }
     }
 
-    public String getTransactionChainId() {
-        return transactionChainId;
+    public static boolean isSerializedType(Object message) {
+        return message instanceof CreateTransaction || message instanceof ShardTransactionMessages.CreateTransaction;
     }
 }
index c2bf81fa8e75fd0b5da2a700d37ab142e4815042..ec38f749e9657e5c1e633022a7a32ffba08a31e1 100644 (file)
@@ -8,28 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class CreateTransactionReply implements SerializableMessage {
+public class CreateTransactionReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<?> SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
-    private final String transactionPath;
-    private final String transactionId;
-    private final short version;
+    private String transactionPath;
+    private String transactionId;
 
-    public CreateTransactionReply(String transactionPath, String transactionId) {
-        this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
+    public CreateTransactionReply() {
     }
 
-    public CreateTransactionReply(final String transactionPath,
-                                  final String transactionId, final short version) {
+    public CreateTransactionReply(final String transactionPath, final String transactionId, final short version) {
+        super(version);
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
-        this.version = version;
     }
 
-
     public String getTransactionPath() {
         return transactionPath;
     }
@@ -38,30 +37,51 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
-    public short getVersion() {
-        return version;
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        transactionId = in.readUTF();
+        transactionPath = in.readUTF();
     }
 
     @Override
-    public Object toSerializable(){
-        return ShardTransactionMessages.CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(transactionPath)
-            .setTransactionId(transactionId)
-            .setMessageVersion(version)
-            .build();
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(transactionId);
+        out.writeUTF(transactionPath);
     }
 
-    public static CreateTransactionReply fromSerializable(Object serializable){
-        ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
-                (short)o.getMessageVersion());
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.CreateTransactionReply.newBuilder().setTransactionActorPath(transactionPath)
+                    .setTransactionId(transactionId).setMessageVersion(getVersion()).build();
+        }
     }
 
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
         builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
-                .append(transactionId).append(", version=").append(version).append("]");
+                .append(transactionId).append(", version=").append(getVersion()).append("]");
         return builder.toString();
     }
+
+    public static CreateTransactionReply fromSerializable(Object serializable) {
+        if(serializable instanceof CreateTransactionReply) {
+            return (CreateTransactionReply)serializable;
+        } else {
+            ShardTransactionMessages.CreateTransactionReply o =
+                    (ShardTransactionMessages.CreateTransactionReply) serializable;
+            return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+                    (short)o.getMessageVersion());
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof CreateTransactionReply ||
+                message instanceof ShardTransactionMessages.CreateTransactionReply;
+    }
 }
index b34737be542d743e09bdd1a0d2df6abf18a4a78f..17b93f4b04d5f6d7445f6b2f1d262b7d41162d0d 100644 (file)
@@ -27,7 +27,7 @@ public abstract class VersionedExternalizableMessage implements Externalizable,
     }
 
     public VersionedExternalizableMessage(short version) {
-        this.version = version;
+        this.version = version <= DataStoreVersions.CURRENT_VERSION ? version: DataStoreVersions.CURRENT_VERSION;
     }
 
     public short getVersion() {
index 03edcf57a96027142fe4322100f590b0df563b2a..148869e63127c2d03c529c19f884a31eba36bd4a 100644 (file)
@@ -53,6 +53,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
@@ -72,7 +73,6 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -187,7 +187,7 @@ public abstract class AbstractTransactionProxyTest {
         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
             @Override
             public boolean matches(Object argument) {
-                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                if(CreateTransaction.class.equals(argument.getClass())) {
                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
                     return obj.getTransactionId().startsWith(memberName) &&
                             obj.getTransactionType() == type.ordinal();
@@ -317,12 +317,8 @@ public abstract class AbstractTransactionProxyTest {
                     eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
     }
 
-    protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
-        return CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1")
-            .setMessageVersion(transactionVersion)
-            .build();
+    protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion){
+        return new CreateTransactionReply(actorRef.path().toString(), "txn-1", transactionVersion);
     }
 
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
index 23a200f6bcda05b11afcd24b248ed8145f400122..f832d2acc1c114fc5cc1371d919bd5fcc6abf208 100644 (file)
@@ -58,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -98,8 +99,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -119,8 +118,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
-    private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
-
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
 
     @Test
@@ -356,13 +353,13 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shard.tell(new CreateTransaction("txn-1",
-                    TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+            shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+                    DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
                     CreateTransactionReply.class);
 
-            final String path = reply.getTransactionActorPath().toString();
+            final String path = reply.getTransactionPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
         }};
@@ -375,14 +372,13 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            shard.tell(new CreateTransaction("txn-1",
-                    TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
-                    getRef());
+            shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
+                    DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
                     CreateTransactionReply.class);
 
-            final String path = reply.getTransactionActorPath().toString();
+            final String path = reply.getTransactionPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
         }};
@@ -891,12 +887,12 @@ public class ShardTest extends AbstractShardTest {
 
             // Create a read Tx on the same chain.
 
-            shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
-                    transactionChainID).toSerializable(), getRef());
+            shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
+                    transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
-            getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+            getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
index 8426e26f1b8f200cccc2113f1995690eb56a8a01..abb3d27249735931715080ae5396ccd72cc14c6e 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -62,7 +63,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -932,9 +932,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         String actorPath = txActorRef.path().toString();
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
-                setTransactionId("txn-1").setTransactionActorPath(actorPath).
-                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
+        CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
+                DataStoreVersions.CURRENT_VERSION);
 
         doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
 
@@ -992,7 +991,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         return dataTreeOptional;
     }
 
-    private static Optional<DataTree> createDataTree(NormalizedNode readResponse){
+    private static Optional<DataTree> createDataTree(NormalizedNode<?, ?> readResponse){
         DataTree dataTree = mock(DataTree.class);
         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
@@ -1250,7 +1249,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadCompletionForLocalShard(){
-        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1326,7 +1325,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testExistsCompletionForLocalShard(){
-        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         completeOperationLocal(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1580,6 +1579,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
 
+        @SuppressWarnings("unchecked")
         Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
 
         for(NormalizedNode<?,?> node : collection){
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java
new file mode 100644 (file)
index 0000000..a6077e2
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Shard unit tests for backwards compatibility with pre-Boron versions.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreBoronShardTest extends AbstractShardTest {
+
+    @Test
+    public void testCreateTransaction(){
+        new ShardTestKit(getSystem()) {{
+            final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
+
+            waitUntilLeader(shard);
+
+            shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+                    DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+
+            ShardTransactionMessages.CreateTransactionReply reply =
+                    expectMsgClass(ShardTransactionMessages.CreateTransactionReply.class);
+
+            final String path = CreateTransactionReply.fromSerializable(reply).getTransactionPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+        }};
+    }
+}
index 4189912d5b4240e106396e621a0055647754a4d5..9dbeb5dfe1041cf936650071e3908f8c531239e9 100644 (file)
@@ -7,19 +7,30 @@
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 
 /**
  * TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
@@ -28,10 +39,56 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
  */
 public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
 
+    private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
+        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(ShardTransactionMessages.CreateTransaction.class.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef, int transactionVersion){
+        return CreateTransactionReply.newBuilder()
+            .setTransactionActorPath(actorRef.path().toString())
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
+    }
+
+    private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type) {
+        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
+                DefaultShardStrategy.DEFAULT_SHARD, DataStoreVersions.LITHIUM_VERSION);
+
+        ActorRef txActorRef;
+        if(type == TransactionType.WRITE_ONLY) {
+            txActorRef = shardActorRef;
+        } else {
+            txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+            doReturn(actorSystem.actorSelection(txActorRef.path())).
+                when(mockActorContext).actorSelection(txActorRef.path().toString());
+
+            doReturn(Futures.successful(legacyCreateTransactionReply(txActorRef, DataStoreVersions.LITHIUM_VERSION)))
+                .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqLegacyCreateTransaction(type), any(Timeout.class));
+        }
+
+        return txActorRef;
+
+    }
+
     @Test
     public void testClose() throws Exception{
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
-                DataStoreVersions.LITHIUM_VERSION, DefaultShardStrategy.DEFAULT_SHARD);
+        ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReplyTest.java
new file mode 100644 (file)
index 0000000..19c7f01
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for CreateTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateTransactionReplyTest {
+
+    @Test
+    public void testSerialization() {
+        CreateTransactionReply expected = new CreateTransactionReply("txPath", "txId", DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", CreateTransactionReply.class, serialized.getClass());
+
+        CreateTransactionReply actual = CreateTransactionReply.fromSerializable(
+                SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+        assertEquals("getTransactionPath", expected.getTransactionPath(), actual.getTransactionPath());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        CreateTransactionReply expected = new CreateTransactionReply("txPath", "txId", DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.CreateTransactionReply.class, serialized.getClass());
+
+        CreateTransactionReply actual = CreateTransactionReply.fromSerializable(
+                SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+        assertEquals("getTransactionPath", expected.getTransactionPath(), actual.getTransactionPath());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, CreateTransactionReply.isSerializedType(
+                ShardTransactionMessages.CreateTransactionReply.newBuilder().setTransactionActorPath("")
+                    .setTransactionId("").setMessageVersion(4).build()));
+
+        assertEquals("isSerializedType", true, CreateTransactionReply.isSerializedType(new CreateTransactionReply()));
+        assertEquals("isSerializedType", false, CreateTransactionReply.isSerializedType(new Object()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionTest.java
new file mode 100644 (file)
index 0000000..771d091
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for CreateTransaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateTransactionTest {
+
+    @Test
+    public void testSerialization() {
+        CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
+
+        CreateTransaction actual = CreateTransaction.fromSerializable(
+                SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+        assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+        assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.CreateTransaction.class, serialized.getClass());
+
+        CreateTransaction actual = CreateTransaction.fromSerializable(
+                SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+        assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+        assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithNewerVersion() {
+        short newerVersion = DataStoreVersions.CURRENT_VERSION + (short)1;
+        CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", newerVersion);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
+
+        CreateTransaction actual = CreateTransaction.fromSerializable(
+                SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
+        assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
+        assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, CreateTransaction.isSerializedType(
+                ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId("")
+                    .setTransactionType(2).setTransactionChainId("").setMessageVersion(4).build()));
+
+        assertEquals("isSerializedType", true, CreateTransaction.isSerializedType(new CreateTransaction()));
+        assertEquals("isSerializedType", false, CreateTransaction.isSerializedType(new Object()));
+    }
+}