Deprecate ReadData/DataExists protobuff messages 78/33178/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 20 Jan 2016 12:57:38 +0000 (07:57 -0500)
committerAnil Vishnoi <vishnoianil@gmail.com>
Wed, 27 Jan 2016 19:35:48 +0000 (19:35 +0000)
Deprecated the associated ReadData, DataExists, and DataExistsReply protobuff
messages and changed the message classes to extend VersionedExternalizableMessage.
Backwards compatibility with pre-boron is maintained. Related code was modified
accordingly.

Change-Id: Ic9b1e79691ce77aecb36df38c1683deadac0c131
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
22 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java [new file with mode: 0644]

index a2560bd..d0bef00 100644 (file)
@@ -36,7 +36,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
-    private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
     private final OperationLimiter limiter;
 
@@ -44,13 +43,11 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private int totalBatchedModificationsSent;
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
-            ActorContext actorContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationLimiter limiter) {
+            ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
         super(identifier);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
-        this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
     }
 
@@ -72,7 +69,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     protected Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
     }
 
     @Override
@@ -209,7 +206,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd);
+        Future<Object> future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion));
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
index 8f5aade..4302ed0 100644 (file)
@@ -249,12 +249,8 @@ final class RemoteTransactionContextSupport {
 
     private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
             short remoteTransactionVersion) {
-        // TxActor is always created where the leader of the shard is.
-        // Check if TxActor is created in the same node
-        boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
         final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
-                transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion,
-                transactionContextWrapper.getLimiter());
+                transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
 
         if(parent.getType() == TransactionType.READ_ONLY) {
             TransactionContextCleanup.track(this, ret);
index 6a830a1..4bde37c 100644 (file)
@@ -37,18 +37,12 @@ public class ShardReadTransaction extends ShardTransaction {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if(message instanceof ReadData) {
-            readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DataExists) {
-            dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
-        } else if (message instanceof CreateSnapshot) {
+        if (message instanceof CreateSnapshot) {
             createSnapshot();
-        } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
+        } else if(ReadData.isSerializedType(message)) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(DataExists.isSerializedType(message)) {
+            dataExists(transaction, DataExists.fromSerializable(message));
 
         } else {
             super.handleReceive(message);
index 0c358e0..90607de 100644 (file)
@@ -25,17 +25,10 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof ReadData) {
-            readData((ReadData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DataExists) {
-            dataExists((DataExists) message, !SERIALIZED_REPLY);
-
-        } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
+        if(ReadData.isSerializedType(message)) {
+            readData(ReadData.fromSerializable(message));
+        } else if(DataExists.isSerializedType(message)) {
+            dataExists((DataExists) message);
         } else {
             super.handleReceive(message);
         }
index c6d13dc..82770a2 100644 (file)
@@ -41,9 +41,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * </p>
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
-
-    protected static final boolean SERIALIZED_REPLY = true;
-
     private final ActorRef shardActor;
     private final ShardStats shardStats;
     private final String transactionID;
@@ -115,31 +112,25 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return ret;
     }
 
-    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
-            final boolean returnSerialized) {
-
+    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
-        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
-        sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
+        sender().tell(readDataReply.toSerializable(), self());
     }
 
-    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
-        final boolean returnSerialized) {
-
+    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
-        DataExistsReply dataExistsReply = DataExistsReply.create(exists);
-        getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
-            dataExistsReply, getSelf());
+        getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
index 5bce73f..1a50d9b 100644 (file)
@@ -89,12 +89,12 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    protected final void dataExists(DataExists message, final boolean returnSerialized) {
-        super.dataExists(transaction, message, returnSerialized);
+    protected final void dataExists(DataExists message) {
+        super.dataExists(transaction, message);
     }
 
-    protected final void readData(ReadData message, final boolean returnSerialized) {
-        super.readData(transaction, message, returnSerialized);
+    protected final void readData(ReadData message) {
+        super.readData(transaction, message);
     }
 
     private boolean checkClosed() {
index d97c858..f645608 100644 (file)
@@ -74,7 +74,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        return executeRead(shardNameFromIdentifier(path), new DataExists(path));
+        return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
     }
 
     private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
@@ -111,7 +111,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
             final String shardName, final YangInstanceIdentifier path) {
-        return executeRead(shardName, new ReadData(path));
+        return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
     }
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
index c1d83e9..e73b5af 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -20,10 +24,16 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * @author gwu
  *
  */
-public abstract class AbstractRead<T> implements SerializableMessage {
-    private final YangInstanceIdentifier path;
+public abstract class AbstractRead<T> extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public AbstractRead(final YangInstanceIdentifier path) {
+    private YangInstanceIdentifier path;
+
+    protected AbstractRead() {
+    }
+
+    public AbstractRead(final YangInstanceIdentifier path, final short version) {
+        super(version);
         this.path = path;
     }
 
@@ -31,8 +41,25 @@ public abstract class AbstractRead<T> implements SerializableMessage {
         return path;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        path = SerializationUtils.deserializePath(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        SerializationUtils.serializePath(path, out);
+    }
+
+    public AbstractRead<T> asVersion(short version) {
+        return version == getVersion() ? this : newInstance(version);
+    }
+
     public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
 
     public abstract void processResponse(Object reponse, SettableFuture<T> promise);
 
+    protected abstract AbstractRead<T> newInstance(short withVersion);
 }
index 2541a04..7d1bcdb 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -17,23 +18,23 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class DataExists extends AbstractRead<Boolean> {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.DataExists> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.DataExists.class;
-
-    public DataExists(final YangInstanceIdentifier path) {
-        super(path);
+    public DataExists() {
     }
 
-    @Override public Object toSerializable() {
-        return ShardTransactionMessages.DataExists.newBuilder()
-            .setInstanceIdentifierPathArguments(
-                InstanceIdentifierUtils.toSerializable(getPath())).build();
+    public DataExists(final YangInstanceIdentifier path, final short version) {
+        super(path, version);
     }
 
-    public static DataExists fromSerializable(final Object serializable){
-        ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
-        return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(getPath())).build();
+        }
     }
 
     @Override
@@ -43,15 +44,29 @@ public class DataExists extends AbstractRead<Boolean> {
 
     @Override
     public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
-        if(response instanceof DataExistsReply) {
-            returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
-
-        } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+        if(DataExistsReply.isSerializedType(response)) {
             returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
-
         } else {
             returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
         }
     }
 
+    @Override
+    protected AbstractRead<Boolean> newInstance(short withVersion) {
+        return new DataExists(getPath(), withVersion);
+    }
+
+    public static DataExists fromSerializable(final Object serializable){
+        if(serializable instanceof DataExists) {
+            return (DataExists)serializable;
+        } else {
+            ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
+            return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof DataExists || message instanceof ShardTransactionMessages.DataExists;
+    }
 }
index 12a4600..e68724a 100644 (file)
@@ -8,40 +8,65 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class DataExistsReply implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.DataExistsReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.DataExistsReply.class;
+public class DataExistsReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
-    private static final DataExistsReply TRUE = new DataExistsReply(true, null);
-    private static final DataExistsReply FALSE = new DataExistsReply(false, null);
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
 
-    private final boolean exists;
+    private boolean exists;
 
-    private DataExistsReply(final boolean exists, final Void dummy) {
-        this.exists = exists;
+    public DataExistsReply() {
     }
 
-    public static DataExistsReply create(final boolean exists) {
-        return exists ? TRUE : FALSE;
+    public DataExistsReply(final boolean exists, final short version) {
+        super(version);
+        this.exists = exists;
     }
 
     public boolean exists() {
         return exists;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        exists = in.readBoolean();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeBoolean(exists);
+    }
+
     @Override
     public Object toSerializable() {
-        return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+        }
     }
 
     public static DataExistsReply fromSerializable(final Object serializable) {
-        ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
-        return create(o.getExists());
+        if(serializable instanceof DataExistsReply) {
+            return (DataExistsReply)serializable;
+        } else {
+            ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
+            return new DataExistsReply(o.getExists(), DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof DataExistsReply || message instanceof ShardTransactionMessages.DataExistsReply;
     }
 }
index f9ed312..c2709d7 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -19,21 +20,23 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
-    public static final Class<ShardTransactionMessages.ReadData> SERIALIZABLE_CLASS = ShardTransactionMessages.ReadData.class;
+    private static final long serialVersionUID = 1L;
 
-    public ReadData(final YangInstanceIdentifier path) {
-        super(path);
+    public ReadData() {
     }
 
-    @Override
-    public Object toSerializable() {
-        return ShardTransactionMessages.ReadData.newBuilder()
-                .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath())).build();
+    public ReadData(final YangInstanceIdentifier path, short version) {
+        super(path, version);
     }
 
-    public static ReadData fromSerializable(final Object serializable) {
-        ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
-        return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+                    InstanceIdentifierUtils.toSerializable(getPath())).build();
+        }
     }
 
     @Override
@@ -50,4 +53,23 @@ public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
             returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
         }
     }
+
+    @Override
+    protected AbstractRead<Optional<NormalizedNode<?, ?>>> newInstance(short withVersion) {
+        return new ReadData(getPath(), withVersion);
+    }
+
+    public static ReadData fromSerializable(final Object serializable) {
+        if(serializable instanceof ReadData) {
+            return (ReadData)serializable;
+        } else {
+            ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
+            return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof ReadData || message instanceof ShardTransactionMessages.ReadData;
+    }
 }
index 148869e..cf5bff0 100644 (file)
@@ -200,18 +200,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
-    protected DataExists eqSerializedDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
     protected DataExists eqDataExists() {
         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
             @Override
@@ -224,22 +212,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
-    protected ReadData eqSerializedReadData() {
-        return eqSerializedReadData(TestModel.TEST_PATH);
-    }
-
-    protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(path);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
     protected ReadData eqReadData() {
         return eqReadData(TestModel.TEST_PATH);
     }
@@ -265,7 +237,7 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
-        return Futures.successful(DataExistsReply.create(exists));
+        return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
     }
 
     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
@@ -350,8 +322,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
         return actorRef;
     }
 
index 6a8ab62..bb32aca 100644 (file)
@@ -13,7 +13,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
@@ -96,7 +95,8 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+                SettableFuture.<Optional<NormalizedNode<?,?>>>create());
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -104,7 +104,8 @@ public class LocalTransactionContextTest {
     public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.<Boolean>create());
+        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+                SettableFuture.<Boolean>create());
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
index 7d49090..ffe069b 100644 (file)
@@ -27,10 +27,10 @@ public class OperationLimiterTest {
         limiter.acquire(permits);
         int availablePermits = 0;
 
-        limiter.onComplete(null, DataExistsReply.create(true));
+        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
-        limiter.onComplete(null, DataExistsReply.create(true));
+        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new IllegalArgumentException());
index f832d2a..83dae88 100644 (file)
@@ -892,7 +892,8 @@ public class ShardTest extends AbstractShardTest {
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
-            getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
+            getSystem().actorSelection(createReply.getTransactionPath()).tell(
+                    new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
index 3ecc8c9..ae4426e 100644 (file)
@@ -66,12 +66,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeReadWithReadOnlyTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+                new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+                DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
@@ -87,12 +88,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeReadWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+                new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+                DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
@@ -107,12 +109,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeExistsWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+                new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject,
+                new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 }
index 5333321..06e3a89 100644 (file)
@@ -117,16 +117,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
-            //serialized read
-            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
-                getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
-            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
-
-            // unserialized read
-            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
+                    DataStoreVersions.CURRENT_VERSION),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -147,15 +139,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
-            // serialized read
-            transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
-            assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
-
-            // unserialized read
-            transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -176,14 +160,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
-            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
-                getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-            assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
-
-            // unserialized read
-            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
+                    DataStoreVersions.CURRENT_VERSION),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -204,13 +182,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
-            transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-            assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
-
-            // unserialized read
-            transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
+            transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -477,6 +449,35 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testOnReceivePreBoronReadData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+                    "testOnReceivePreBoronReadData");
+
+            transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+                    toSerializable(), getRef());
+
+            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+        }};
+    }
+
+    @Test
+    public void testOnReceivePreBoronDataExists() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+                    "testOnReceivePreBoronDataExists");
+
+            transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+                    toSerializable(), getRef());
+
+            Object replySerialized = expectMsgClass(duration("5 seconds"),
+                    ShardTransactionMessages.DataExistsReply.class);
+            assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
+        }};
+    }
+
     public static class TestException extends RuntimeException {
         private static final long serialVersionUID = 1L;
     }
index abb3d27..8f30331 100644 (file)
@@ -92,7 +92,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -102,7 +102,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -116,7 +116,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -128,7 +128,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -188,7 +188,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -205,7 +205,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
     }
 
     @Test(expected=IllegalStateException.class)
@@ -240,14 +240,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -269,7 +269,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -281,7 +281,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -297,7 +297,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -312,7 +312,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
     }
 
     @Test(expected=IllegalStateException.class)
@@ -347,7 +347,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef);
 
@@ -443,7 +443,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModifications(actorRef, 1);
 
@@ -471,7 +471,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -501,7 +501,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -761,7 +761,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -773,75 +773,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
     }
 
-
-    /**
-     * Method to test a local Tx actor. The Tx paths are matched to decide if the
-     * Tx actor is local or not. This is done by mocking the Tx actor path
-     * and the caller paths and ensuring that the paths have the remote-address format
-     *
-     * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
-     * the paths returned for the actors for all the tests are not qualified remote paths.
-     * Hence are treated as non-local/remote actors. In short, all tests except
-     * few below run for remote actors
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testLocalTxActorRead() throws Exception {
-        setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        // negative test case with null as the reply
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqReadData());
-
-        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
-            TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
-
-        // test case with node as read data reply
-        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqReadData());
-
-        readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
-        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
-
-        // test for local data exists
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqDataExists());
-
-        boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
-
-        assertEquals("Exists response", true, exists);
-    }
-
-    @Test
-    public void testLocalTxActorReady() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-    }
-
     private static interface TransactionProxyOperation {
         void run(TransactionProxy transactionProxy);
     }
@@ -895,8 +826,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
-        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
-
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
@@ -941,8 +870,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
@@ -1486,13 +1413,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
 
         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2));
 
         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -1534,19 +1461,19 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
     }
 
     @Test
@@ -1608,8 +1535,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
-        doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
-
         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
         doReturn(actorSystem.actorSelection(txActorRef.path())).
@@ -1620,6 +1545,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
+                eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()));
     }
 }
index 9dbeb5d..2ca4bca 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -19,6 +20,8 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
@@ -26,17 +29,22 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionType;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 /**
  * TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
  *
  * @author Thomas Pantelis
  */
+@SuppressWarnings("resource")
 public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
 
     private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
@@ -56,14 +64,39 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
-    private CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef, int transactionVersion){
-        return CreateTransactionReply.newBuilder()
+    private ShardTransactionMessages.CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef,
+            int transactionVersion){
+        return ShardTransactionMessages.CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
             .setTransactionId("txn-1")
             .setMessageVersion(transactionVersion)
             .build();
     }
 
+    private ReadData eqLegacySerializedReadData(final YangInstanceIdentifier path) {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.ReadData.class.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(path);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private DataExists eqLegacySerializedDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.DataExists.class.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
     private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
             TransactionType type) {
         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
@@ -83,23 +116,51 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         return txActorRef;
-
     }
 
     @Test
     public void testClose() throws Exception{
         ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+        expectBatchedModifications(actorRef, 1);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         transactionProxy.close();
 
         verify(mockActorContext).sendOperationAsync(
                 eq(actorSelection(actorRef)), isA(ShardTransactionMessages.CloseTransaction.class));
     }
+
+    @Test
+    public void testRead() throws Exception{
+        ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+    }
+
+    @Test
+    public void testExists() throws Exception{
+        ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqLegacySerializedDataExists());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+        assertEquals("Exists response", true, exists);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java
new file mode 100644 (file)
index 0000000..a4c9039
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExistsReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsReplyTest {
+
+    @Test
+    public void testSerialization() {
+        DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", DataExistsReply.class, serialized.getClass());
+
+        DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("exists", expected.exists(), actual.exists());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.DataExistsReply.class, serialized.getClass());
+
+        DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("exists", expected.exists(), actual.exists());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(
+                ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build()));
+
+        assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(new DataExistsReply()));
+        assertEquals("isSerializedType", false, DataExistsReply.isSerializedType(new Object()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java
new file mode 100644 (file)
index 0000000..2c16e81
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExists.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsTest {
+
+    @Test
+    public void testSerialization() {
+        DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", DataExists.class, serialized.getClass());
+
+        DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.DataExists.class, serialized.getClass());
+
+        DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, DataExists.isSerializedType(
+                ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+        assertEquals("isSerializedType", true, DataExists.isSerializedType(new DataExists()));
+        assertEquals("isSerializedType", false, DataExists.isSerializedType(new Object()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java
new file mode 100644 (file)
index 0000000..5690218
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadData.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataTest {
+
+    @Test
+    public void testSerialization() {
+        ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ReadData.class, serialized.getClass());
+
+        ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.ReadData.class, serialized.getClass());
+
+        ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, ReadData.isSerializedType(
+                ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+        assertEquals("isSerializedType", true, ReadData.isSerializedType(new ReadData()));
+        assertEquals("isSerializedType", false, ReadData.isSerializedType(new Object()));
+    }
+}