Deprecate ReadData/DataExists protobuff messages 78/33178/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 20 Jan 2016 12:57:38 +0000 (07:57 -0500)
committerAnil Vishnoi <vishnoianil@gmail.com>
Wed, 27 Jan 2016 19:35:48 +0000 (19:35 +0000)
Deprecated the associated ReadData, DataExists, and DataExistsReply protobuff
messages and changed the message classes to extend VersionedExternalizableMessage.
Backwards compatibility with pre-boron is maintained. Related code was modified
accordingly.

Change-Id: Ic9b1e79691ce77aecb36df38c1683deadac0c131
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
22 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java [new file with mode: 0644]

index a2560bd369d353e46cfbc23aeaa780c1057f3b0c..d0bef0000e7e1a72432f914c9e25d906196faea8 100644 (file)
@@ -36,7 +36,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
-    private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
     private final OperationLimiter limiter;
 
     private final short remoteTransactionVersion;
     private final OperationLimiter limiter;
 
@@ -44,13 +43,11 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private int totalBatchedModificationsSent;
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
     private int totalBatchedModificationsSent;
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
-            ActorContext actorContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationLimiter limiter) {
+            ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
         super(identifier);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         super(identifier);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
-        this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
     }
 
         this.remoteTransactionVersion = remoteTransactionVersion;
     }
 
@@ -72,7 +69,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     protected Future<Object> executeOperationAsync(SerializableMessage msg) {
     }
 
     protected Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
     }
 
     @Override
     }
 
     @Override
@@ -209,7 +206,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd);
+        Future<Object> future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion));
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
index 8f5aade018aa936eb9b3a42fcc4e9a4539961afb..4302ed05d31ce572338d0cc698e40ba3b83cae63 100644 (file)
@@ -249,12 +249,8 @@ final class RemoteTransactionContextSupport {
 
     private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
             short remoteTransactionVersion) {
 
     private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
             short remoteTransactionVersion) {
-        // TxActor is always created where the leader of the shard is.
-        // Check if TxActor is created in the same node
-        boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
         final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
         final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
-                transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion,
-                transactionContextWrapper.getLimiter());
+                transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
 
         if(parent.getType() == TransactionType.READ_ONLY) {
             TransactionContextCleanup.track(this, ret);
 
         if(parent.getType() == TransactionType.READ_ONLY) {
             TransactionContextCleanup.track(this, ret);
index 6a830a1ddbfd0a0645838cb9e7ed20c14ed976c7..4bde37c202c1c52215affad7f5134be3814fd08c 100644 (file)
@@ -37,18 +37,12 @@ public class ShardReadTransaction extends ShardTransaction {
 
     @Override
     public void handleReceive(Object message) throws Exception {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if(message instanceof ReadData) {
-            readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DataExists) {
-            dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
-        } else if (message instanceof CreateSnapshot) {
+        if (message instanceof CreateSnapshot) {
             createSnapshot();
             createSnapshot();
-        } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
+        } else if(ReadData.isSerializedType(message)) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(DataExists.isSerializedType(message)) {
+            dataExists(transaction, DataExists.fromSerializable(message));
 
         } else {
             super.handleReceive(message);
 
         } else {
             super.handleReceive(message);
index 0c358e0b4b3bd120871832a0cdf680cce5b1721f..90607dea0f3e97bcc6806cdcbdd4bac0ccd1caa2 100644 (file)
@@ -25,17 +25,10 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
 
     @Override
     public void handleReceive(Object message) throws Exception {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof ReadData) {
-            readData((ReadData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DataExists) {
-            dataExists((DataExists) message, !SERIALIZED_REPLY);
-
-        } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
+        if(ReadData.isSerializedType(message)) {
+            readData(ReadData.fromSerializable(message));
+        } else if(DataExists.isSerializedType(message)) {
+            dataExists((DataExists) message);
         } else {
             super.handleReceive(message);
         }
         } else {
             super.handleReceive(message);
         }
index c6d13dcc8180553af87e0d75364999203e4b8636..82770a20bd4c87ea04a65728b54a89631c9b9b70 100644 (file)
@@ -41,9 +41,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * </p>
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
  * </p>
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
-
-    protected static final boolean SERIALIZED_REPLY = true;
-
     private final ActorRef shardActor;
     private final ShardStats shardStats;
     private final String transactionID;
     private final ActorRef shardActor;
     private final ShardStats shardStats;
     private final String transactionID;
@@ -115,31 +112,25 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return ret;
     }
 
         return ret;
     }
 
-    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
-            final boolean returnSerialized) {
-
+    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
-        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
-        sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
+        sender().tell(readDataReply.toSerializable(), self());
     }
 
     }
 
-    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
-        final boolean returnSerialized) {
-
+    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
         if (checkClosed(transaction)) {
             return;
         }
 
         final YangInstanceIdentifier path = message.getPath();
         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
-        DataExistsReply dataExistsReply = DataExistsReply.create(exists);
-        getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
-            dataExistsReply, getSelf());
+        getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
index 5bce73f939cd76fe1e4a8e93d882d1e29b50732c..1a50d9b06f1b62f93c488d19c9d57c686ca76b3a 100644 (file)
@@ -89,12 +89,12 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
         }
     }
 
-    protected final void dataExists(DataExists message, final boolean returnSerialized) {
-        super.dataExists(transaction, message, returnSerialized);
+    protected final void dataExists(DataExists message) {
+        super.dataExists(transaction, message);
     }
 
     }
 
-    protected final void readData(ReadData message, final boolean returnSerialized) {
-        super.readData(transaction, message, returnSerialized);
+    protected final void readData(ReadData message) {
+        super.readData(transaction, message);
     }
 
     private boolean checkClosed() {
     }
 
     private boolean checkClosed() {
index d97c858672f1ab92158fe574d84fd81f7c14deca..f645608dd9b96c327153c804f544a0b2eacb16b3 100644 (file)
@@ -74,7 +74,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        return executeRead(shardNameFromIdentifier(path), new DataExists(path));
+        return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
     }
 
     private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
     }
 
     private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
@@ -111,7 +111,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
             final String shardName, final YangInstanceIdentifier path) {
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
             final String shardName, final YangInstanceIdentifier path) {
-        return executeRead(shardName, new ReadData(path));
+        return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
     }
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
     }
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
index c1d83e989184a7795cb5b987438d4a562b67659b..e73b5af7cdecdf7a311c9a36f36bb975f19d17a3 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -20,10 +24,16 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * @author gwu
  *
  */
  * @author gwu
  *
  */
-public abstract class AbstractRead<T> implements SerializableMessage {
-    private final YangInstanceIdentifier path;
+public abstract class AbstractRead<T> extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
 
-    public AbstractRead(final YangInstanceIdentifier path) {
+    private YangInstanceIdentifier path;
+
+    protected AbstractRead() {
+    }
+
+    public AbstractRead(final YangInstanceIdentifier path, final short version) {
+        super(version);
         this.path = path;
     }
 
         this.path = path;
     }
 
@@ -31,8 +41,25 @@ public abstract class AbstractRead<T> implements SerializableMessage {
         return path;
     }
 
         return path;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        path = SerializationUtils.deserializePath(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        SerializationUtils.serializePath(path, out);
+    }
+
+    public AbstractRead<T> asVersion(short version) {
+        return version == getVersion() ? this : newInstance(version);
+    }
+
     public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
 
     public abstract void processResponse(Object reponse, SettableFuture<T> promise);
 
     public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
 
     public abstract void processResponse(Object reponse, SettableFuture<T> promise);
 
+    protected abstract AbstractRead<T> newInstance(short withVersion);
 }
 }
index 2541a04d5fe32969f6e4b0889a6c980033b155ba..7d1bcdb8f65c3273e50685eaa58c76cdc6d67d9b 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -17,23 +18,23 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class DataExists extends AbstractRead<Boolean> {
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class DataExists extends AbstractRead<Boolean> {
+    private static final long serialVersionUID = 1L;
 
 
-    public static final Class<ShardTransactionMessages.DataExists> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.DataExists.class;
-
-    public DataExists(final YangInstanceIdentifier path) {
-        super(path);
+    public DataExists() {
     }
 
     }
 
-    @Override public Object toSerializable() {
-        return ShardTransactionMessages.DataExists.newBuilder()
-            .setInstanceIdentifierPathArguments(
-                InstanceIdentifierUtils.toSerializable(getPath())).build();
+    public DataExists(final YangInstanceIdentifier path, final short version) {
+        super(path, version);
     }
 
     }
 
-    public static DataExists fromSerializable(final Object serializable){
-        ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
-        return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(getPath())).build();
+        }
     }
 
     @Override
     }
 
     @Override
@@ -43,15 +44,29 @@ public class DataExists extends AbstractRead<Boolean> {
 
     @Override
     public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
 
     @Override
     public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
-        if(response instanceof DataExistsReply) {
-            returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
-
-        } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+        if(DataExistsReply.isSerializedType(response)) {
             returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
             returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
-
         } else {
             returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
         }
     }
 
         } else {
             returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
         }
     }
 
+    @Override
+    protected AbstractRead<Boolean> newInstance(short withVersion) {
+        return new DataExists(getPath(), withVersion);
+    }
+
+    public static DataExists fromSerializable(final Object serializable){
+        if(serializable instanceof DataExists) {
+            return (DataExists)serializable;
+        } else {
+            ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
+            return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof DataExists || message instanceof ShardTransactionMessages.DataExists;
+    }
 }
 }
index 12a4600a4c28565e263868a17412c4e7cd847b2e..e68724a2626766091bd988f60ed54cd1b5fe9b16 100644 (file)
@@ -8,40 +8,65 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class DataExistsReply implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.DataExistsReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.DataExistsReply.class;
+public class DataExistsReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
 
 
-    private static final DataExistsReply TRUE = new DataExistsReply(true, null);
-    private static final DataExistsReply FALSE = new DataExistsReply(false, null);
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
 
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
     private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
             ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
 
-    private final boolean exists;
+    private boolean exists;
 
 
-    private DataExistsReply(final boolean exists, final Void dummy) {
-        this.exists = exists;
+    public DataExistsReply() {
     }
 
     }
 
-    public static DataExistsReply create(final boolean exists) {
-        return exists ? TRUE : FALSE;
+    public DataExistsReply(final boolean exists, final short version) {
+        super(version);
+        this.exists = exists;
     }
 
     public boolean exists() {
         return exists;
     }
 
     }
 
     public boolean exists() {
         return exists;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        exists = in.readBoolean();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeBoolean(exists);
+    }
+
     @Override
     public Object toSerializable() {
     @Override
     public Object toSerializable() {
-        return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+        }
     }
 
     public static DataExistsReply fromSerializable(final Object serializable) {
     }
 
     public static DataExistsReply fromSerializable(final Object serializable) {
-        ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
-        return create(o.getExists());
+        if(serializable instanceof DataExistsReply) {
+            return (DataExistsReply)serializable;
+        } else {
+            ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
+            return new DataExistsReply(o.getExists(), DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof DataExistsReply || message instanceof ShardTransactionMessages.DataExistsReply;
     }
 }
     }
 }
index f9ed3124c95fe6a9ff17b6e256e53a82e6a58d4a..c2709d757fdacc83ea6932337dd27ee64f7ab46f 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -19,21 +20,23 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
-    public static final Class<ShardTransactionMessages.ReadData> SERIALIZABLE_CLASS = ShardTransactionMessages.ReadData.class;
+    private static final long serialVersionUID = 1L;
 
 
-    public ReadData(final YangInstanceIdentifier path) {
-        super(path);
+    public ReadData() {
     }
 
     }
 
-    @Override
-    public Object toSerializable() {
-        return ShardTransactionMessages.ReadData.newBuilder()
-                .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath())).build();
+    public ReadData(final YangInstanceIdentifier path, short version) {
+        super(path, version);
     }
 
     }
 
-    public static ReadData fromSerializable(final Object serializable) {
-        ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
-        return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+                    InstanceIdentifierUtils.toSerializable(getPath())).build();
+        }
     }
 
     @Override
     }
 
     @Override
@@ -50,4 +53,23 @@ public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
             returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
         }
     }
             returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
         }
     }
+
+    @Override
+    protected AbstractRead<Optional<NormalizedNode<?, ?>>> newInstance(short withVersion) {
+        return new ReadData(getPath(), withVersion);
+    }
+
+    public static ReadData fromSerializable(final Object serializable) {
+        if(serializable instanceof ReadData) {
+            return (ReadData)serializable;
+        } else {
+            ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
+            return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.LITHIUM_VERSION);
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof ReadData || message instanceof ShardTransactionMessages.ReadData;
+    }
 }
 }
index 148869e63127c2d03c529c19f884a31eba36bd4a..cf5bff0846ba8cc3635f958edac2892b00554493 100644 (file)
@@ -200,18 +200,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
-    protected DataExists eqSerializedDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
     protected DataExists eqDataExists() {
         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
             @Override
     protected DataExists eqDataExists() {
         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
             @Override
@@ -224,22 +212,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
-    protected ReadData eqSerializedReadData() {
-        return eqSerializedReadData(TestModel.TEST_PATH);
-    }
-
-    protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(path);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
     protected ReadData eqReadData() {
         return eqReadData(TestModel.TEST_PATH);
     }
     protected ReadData eqReadData() {
         return eqReadData(TestModel.TEST_PATH);
     }
@@ -265,7 +237,7 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
     }
 
     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
-        return Futures.successful(DataExistsReply.create(exists));
+        return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
     }
 
     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
     }
 
     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
@@ -350,8 +322,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
         return actorRef;
     }
 
         return actorRef;
     }
 
index 6a8ab620c581a6ebdd449f32e94480c46e0ac48a..bb32aca53313bd0d88272361f15f354eb9fdc5d8 100644 (file)
@@ -13,7 +13,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
@@ -96,7 +95,8 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+                SettableFuture.<Optional<NormalizedNode<?,?>>>create());
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -104,7 +104,8 @@ public class LocalTransactionContextTest {
     public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
     public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
-        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.<Boolean>create());
+        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+                SettableFuture.<Boolean>create());
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
index 7d49090f685e3cbc879c6fdecfb465fc60a1d379..ffe069b78c38e0bd4c1ad47002aa070be72c4ec7 100644 (file)
@@ -27,10 +27,10 @@ public class OperationLimiterTest {
         limiter.acquire(permits);
         int availablePermits = 0;
 
         limiter.acquire(permits);
         int availablePermits = 0;
 
-        limiter.onComplete(null, DataExistsReply.create(true));
+        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
-        limiter.onComplete(null, DataExistsReply.create(true));
+        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new IllegalArgumentException());
         assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
 
         limiter.onComplete(null, new IllegalArgumentException());
index f832d2acc1c114fc5cc1371d919bd5fcc6abf208..83dae880bad3fceef0133fdf692884ca2297f5c7 100644 (file)
@@ -892,7 +892,8 @@ public class ShardTest extends AbstractShardTest {
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
-            getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
+            getSystem().actorSelection(createReply.getTransactionPath()).tell(
+                    new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
index 3ecc8c9902d0a65fd02f7dfb144dd4d830322ea3..ae4426e35b8281f4c88f1ec6a4b162da67be9ba3 100644 (file)
@@ -66,12 +66,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeReadWithReadOnlyTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
                 "testNegativeReadWithReadOnlyTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+                new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+                DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
@@ -87,12 +88,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeReadWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
                 "testNegativeReadWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+                new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+                DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
@@ -107,12 +109,13 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 "testNegativeExistsWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
                 "testNegativeExistsWithReadWriteTransactionClosed");
 
         Future<Object> future = akka.pattern.Patterns.ask(subject,
-                new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+                new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         subject.underlyingActor().getDOMStoreTransaction().abort();
 
-        future = akka.pattern.Patterns.ask(subject, new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+        future = akka.pattern.Patterns.ask(subject,
+                new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 }
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 }
index 5333321a426842c428370f42e045ab16f0909234..06e3a8953a5389111731c99cded23e15588ba14f 100644 (file)
@@ -117,16 +117,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
-            //serialized read
-            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
-                getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
-            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
-
-            // unserialized read
-            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
+                    DataStoreVersions.CURRENT_VERSION),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -147,15 +139,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
-            // serialized read
-            transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
-            assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
-
-            // unserialized read
-            transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -176,14 +160,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
         }
 
         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
-            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
-                getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-            assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
-
-            // unserialized read
-            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
+                    DataStoreVersions.CURRENT_VERSION),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -204,13 +182,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         }
 
         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
         }
 
         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
-            transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
-
-            Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-            assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
-
-            // unserialized read
-            transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
+            transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -477,6 +449,35 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
         }};
     }
 
+    @Test
+    public void testOnReceivePreBoronReadData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+                    "testOnReceivePreBoronReadData");
+
+            transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+                    toSerializable(), getRef());
+
+            Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+        }};
+    }
+
+    @Test
+    public void testOnReceivePreBoronDataExists() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+                    "testOnReceivePreBoronDataExists");
+
+            transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+                    toSerializable(), getRef());
+
+            Object replySerialized = expectMsgClass(duration("5 seconds"),
+                    ShardTransactionMessages.DataExistsReply.class);
+            assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
+        }};
+    }
+
     public static class TestException extends RuntimeException {
         private static final long serialVersionUID = 1L;
     }
     public static class TestException extends RuntimeException {
         private static final long serialVersionUID = 1L;
     }
index abb3d27249735931715080ae5396ccd72cc14c6e..8f30331550dddd539c39cf8e8067610547ec4fd5 100644 (file)
@@ -92,7 +92,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -102,7 +102,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -116,7 +116,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -128,7 +128,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -188,7 +188,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
         expectBatchedModifications(actorRef, 1);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -205,7 +205,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
     }
 
     @Test(expected=IllegalStateException.class)
     }
 
     @Test(expected=IllegalStateException.class)
@@ -240,14 +240,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -269,7 +269,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -281,7 +281,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -297,7 +297,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
         expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -312,7 +312,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
     }
 
     @Test(expected=IllegalStateException.class)
     }
 
     @Test(expected=IllegalStateException.class)
@@ -347,7 +347,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef);
 
 
         expectBatchedModificationsReady(actorRef);
 
@@ -443,7 +443,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModifications(actorRef, 1);
 
 
         expectBatchedModifications(actorRef, 1);
 
@@ -471,7 +471,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef, true);
 
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -501,7 +501,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         expectBatchedModificationsReady(actorRef, true);
 
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -761,7 +761,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -773,75 +773,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
     }
 
                 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
     }
 
-
-    /**
-     * Method to test a local Tx actor. The Tx paths are matched to decide if the
-     * Tx actor is local or not. This is done by mocking the Tx actor path
-     * and the caller paths and ensuring that the paths have the remote-address format
-     *
-     * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
-     * the paths returned for the actors for all the tests are not qualified remote paths.
-     * Hence are treated as non-local/remote actors. In short, all tests except
-     * few below run for remote actors
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testLocalTxActorRead() throws Exception {
-        setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
-        // negative test case with null as the reply
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqReadData());
-
-        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
-            TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
-
-        // test case with node as read data reply
-        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqReadData());
-
-        readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
-        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
-
-        // test for local data exists
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqDataExists());
-
-        boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
-
-        assertEquals("Exists response", true, exists);
-    }
-
-    @Test
-    public void testLocalTxActorReady() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
-        expectBatchedModificationsReady(actorRef, true);
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof SingleCommitCohortProxy);
-
-        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-    }
-
     private static interface TransactionProxyOperation {
         void run(TransactionProxy transactionProxy);
     }
     private static interface TransactionProxyOperation {
         void run(TransactionProxy transactionProxy);
     }
@@ -895,8 +826,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                  eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
-        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
-
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
@@ -941,8 +870,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                         eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
-        doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
@@ -1486,13 +1413,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
 
         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
 
         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2));
 
         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
 
         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -1534,19 +1461,19 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
+                eq(actorSelection(actorRef)), eqDataExists());
     }
 
     @Test
     }
 
     @Test
@@ -1608,8 +1535,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
-        doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
-
         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
         doReturn(actorSystem.actorSelection(txActorRef.path())).
         ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
         doReturn(actorSystem.actorSelection(txActorRef.path())).
@@ -1620,6 +1545,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
+                eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()));
     }
 }
     }
 }
index 9dbeb5dfe1041cf936650071e3908f8c531239e9..2ca4bcab4a2db3c2f77ed9faa91c21de29189e3a 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -19,6 +20,8 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.util.Timeout;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
@@ -26,17 +29,22 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionType;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionType;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 /**
  * TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
  *
  * @author Thomas Pantelis
  */
 
 /**
  * TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
  *
  * @author Thomas Pantelis
  */
+@SuppressWarnings("resource")
 public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
 
     private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
 public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
 
     private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
@@ -56,14 +64,39 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
-    private CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef, int transactionVersion){
-        return CreateTransactionReply.newBuilder()
+    private ShardTransactionMessages.CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef,
+            int transactionVersion){
+        return ShardTransactionMessages.CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
             .setTransactionId("txn-1")
             .setMessageVersion(transactionVersion)
             .build();
     }
 
             .setTransactionActorPath(actorRef.path().toString())
             .setTransactionId("txn-1")
             .setMessageVersion(transactionVersion)
             .build();
     }
 
+    private ReadData eqLegacySerializedReadData(final YangInstanceIdentifier path) {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.ReadData.class.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(path);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private DataExists eqLegacySerializedDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.DataExists.class.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
     private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
             TransactionType type) {
         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
     private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
             TransactionType type) {
         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
@@ -83,23 +116,51 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         return txActorRef;
         }
 
         return txActorRef;
-
     }
 
     @Test
     public void testClose() throws Exception{
         ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
     }
 
     @Test
     public void testClose() throws Exception{
         ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
+        expectBatchedModifications(actorRef, 1);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         transactionProxy.close();
 
         verify(mockActorContext).sendOperationAsync(
                 eq(actorSelection(actorRef)), isA(ShardTransactionMessages.CloseTransaction.class));
     }
 
         transactionProxy.close();
 
         verify(mockActorContext).sendOperationAsync(
                 eq(actorSelection(actorRef)), isA(ShardTransactionMessages.CloseTransaction.class));
     }
+
+    @Test
+    public void testRead() throws Exception{
+        ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+    }
+
+    @Test
+    public void testExists() throws Exception{
+        ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqLegacySerializedDataExists());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+        assertEquals("Exists response", true, exists);
+    }
 }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReplyTest.java
new file mode 100644 (file)
index 0000000..a4c9039
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExistsReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsReplyTest {
+
+    @Test
+    public void testSerialization() {
+        DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", DataExistsReply.class, serialized.getClass());
+
+        DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("exists", expected.exists(), actual.exists());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.DataExistsReply.class, serialized.getClass());
+
+        DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("exists", expected.exists(), actual.exists());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(
+                ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build()));
+
+        assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(new DataExistsReply()));
+        assertEquals("isSerializedType", false, DataExistsReply.isSerializedType(new Object()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsTest.java
new file mode 100644 (file)
index 0000000..2c16e81
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExists.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsTest {
+
+    @Test
+    public void testSerialization() {
+        DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", DataExists.class, serialized.getClass());
+
+        DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.DataExists.class, serialized.getClass());
+
+        DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, DataExists.isSerializedType(
+                ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+        assertEquals("isSerializedType", true, DataExists.isSerializedType(new DataExists()));
+        assertEquals("isSerializedType", false, DataExists.isSerializedType(new Object()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataTest.java
new file mode 100644 (file)
index 0000000..5690218
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadData.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataTest {
+
+    @Test
+    public void testSerialization() {
+        ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ReadData.class, serialized.getClass());
+
+        ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testSerializationWithPreBoronVersion() {
+        ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.ReadData.class, serialized.getClass());
+
+        ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, ReadData.isSerializedType(
+                ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+                        InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+        assertEquals("isSerializedType", true, ReadData.isSerializedType(new ReadData()));
+        assertEquals("isSerializedType", false, ReadData.isSerializedType(new Object()));
+    }
+}