private final ActorContext actorContext;
private final ActorSelection actor;
- private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
private final OperationLimiter limiter;
private int totalBatchedModificationsSent;
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
- ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationLimiter limiter) {
+ ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
super(identifier);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
- this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
}
}
protected Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
}
@Override
}
};
- Future<Object> future = executeOperationAsync(readCmd);
+ Future<Object> future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion));
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
short remoteTransactionVersion) {
- // TxActor is always created where the leader of the shard is.
- // Check if TxActor is created in the same node
- boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
- transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion,
- transactionContextWrapper.getLimiter());
+ transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
if(parent.getType() == TransactionType.READ_ONLY) {
TransactionContextCleanup.track(this, ret);
@Override
public void handleReceive(Object message) throws Exception {
- if(message instanceof ReadData) {
- readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof DataExists) {
- dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
- } else if (message instanceof CreateSnapshot) {
+ if (message instanceof CreateSnapshot) {
createSnapshot();
- } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
- } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
+ } else if(ReadData.isSerializedType(message)) {
+ readData(transaction, ReadData.fromSerializable(message));
+ } else if(DataExists.isSerializedType(message)) {
+ dataExists(transaction, DataExists.fromSerializable(message));
} else {
super.handleReceive(message);
@Override
public void handleReceive(Object message) throws Exception {
- if (message instanceof ReadData) {
- readData((ReadData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof DataExists) {
- dataExists((DataExists) message, !SERIALIZED_REPLY);
-
- } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
-
- } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
+ if(ReadData.isSerializedType(message)) {
+ readData(ReadData.fromSerializable(message));
+ } else if(DataExists.isSerializedType(message)) {
+ dataExists((DataExists) message);
} else {
super.handleReceive(message);
}
* </p>
*/
public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
-
- protected static final boolean SERIALIZED_REPLY = true;
-
private final ActorRef shardActor;
private final ShardStats shardStats;
private final String transactionID;
return ret;
}
- protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
- final boolean returnSerialized) {
-
+ protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
if (checkClosed(transaction)) {
return;
}
final YangInstanceIdentifier path = message.getPath();
Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
- ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
- sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
+ sender().tell(readDataReply.toSerializable(), self());
}
- protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
- final boolean returnSerialized) {
-
+ protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
if (checkClosed(transaction)) {
return;
}
final YangInstanceIdentifier path = message.getPath();
boolean exists = transaction.getSnapshot().readNode(path).isPresent();
- DataExistsReply dataExistsReply = DataExistsReply.create(exists);
- getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
- dataExistsReply, getSelf());
+ getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
}
private static class ShardTransactionCreator implements Creator<ShardTransaction> {
}
}
- protected final void dataExists(DataExists message, final boolean returnSerialized) {
- super.dataExists(transaction, message, returnSerialized);
+ protected final void dataExists(DataExists message) {
+ super.dataExists(transaction, message);
}
- protected final void readData(ReadData message, final boolean returnSerialized) {
- super.readData(transaction, message, returnSerialized);
+ protected final void readData(ReadData message) {
+ super.readData(transaction, message);
}
private boolean checkClosed() {
@Override
public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
- return executeRead(shardNameFromIdentifier(path), new DataExists(path));
+ return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
}
private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
final String shardName, final YangInstanceIdentifier path) {
- return executeRead(shardName, new ReadData(path));
+ return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
}
private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* @author gwu
*
*/
-public abstract class AbstractRead<T> implements SerializableMessage {
- private final YangInstanceIdentifier path;
+public abstract class AbstractRead<T> extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
- public AbstractRead(final YangInstanceIdentifier path) {
+ private YangInstanceIdentifier path;
+
+ protected AbstractRead() {
+ }
+
+ public AbstractRead(final YangInstanceIdentifier path, final short version) {
+ super(version);
this.path = path;
}
return path;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ path = SerializationUtils.deserializePath(in);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ SerializationUtils.serializePath(path, out);
+ }
+
+ public AbstractRead<T> asVersion(short version) {
+ return version == getVersion() ? this : newInstance(version);
+ }
+
public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
public abstract void processResponse(Object reponse, SettableFuture<T> promise);
+ protected abstract AbstractRead<T> newInstance(short withVersion);
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class DataExists extends AbstractRead<Boolean> {
+ private static final long serialVersionUID = 1L;
- public static final Class<ShardTransactionMessages.DataExists> SERIALIZABLE_CLASS =
- ShardTransactionMessages.DataExists.class;
-
- public DataExists(final YangInstanceIdentifier path) {
- super(path);
+ public DataExists() {
}
- @Override public Object toSerializable() {
- return ShardTransactionMessages.DataExists.newBuilder()
- .setInstanceIdentifierPathArguments(
- InstanceIdentifierUtils.toSerializable(getPath())).build();
+ public DataExists(final YangInstanceIdentifier path, final short version) {
+ super(path, version);
}
- public static DataExists fromSerializable(final Object serializable){
- ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
- return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+ @Override
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+ return this;
+ } else {
+ return ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(getPath())).build();
+ }
}
@Override
@Override
public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
- if(response instanceof DataExistsReply) {
- returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
-
- } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ if(DataExistsReply.isSerializedType(response)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
-
} else {
returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
}
}
+ @Override
+ protected AbstractRead<Boolean> newInstance(short withVersion) {
+ return new DataExists(getPath(), withVersion);
+ }
+
+ public static DataExists fromSerializable(final Object serializable){
+ if(serializable instanceof DataExists) {
+ return (DataExists)serializable;
+ } else {
+ ShardTransactionMessages.DataExists o = (ShardTransactionMessages.DataExists) serializable;
+ return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+ DataStoreVersions.LITHIUM_VERSION);
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof DataExists || message instanceof ShardTransactionMessages.DataExists;
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class DataExistsReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.DataExistsReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.DataExistsReply.class;
+public class DataExistsReply extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
- private static final DataExistsReply TRUE = new DataExistsReply(true, null);
- private static final DataExistsReply FALSE = new DataExistsReply(false, null);
private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
- private final boolean exists;
+ private boolean exists;
- private DataExistsReply(final boolean exists, final Void dummy) {
- this.exists = exists;
+ public DataExistsReply() {
}
- public static DataExistsReply create(final boolean exists) {
- return exists ? TRUE : FALSE;
+ public DataExistsReply(final boolean exists, final short version) {
+ super(version);
+ this.exists = exists;
}
public boolean exists() {
return exists;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ exists = in.readBoolean();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeBoolean(exists);
+ }
+
@Override
public Object toSerializable() {
- return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+ if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+ return this;
+ } else {
+ return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
+ }
}
public static DataExistsReply fromSerializable(final Object serializable) {
- ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
- return create(o.getExists());
+ if(serializable instanceof DataExistsReply) {
+ return (DataExistsReply)serializable;
+ } else {
+ ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
+ return new DataExistsReply(o.getExists(), DataStoreVersions.LITHIUM_VERSION);
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof DataExistsReply || message instanceof ShardTransactionMessages.DataExistsReply;
}
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
- public static final Class<ShardTransactionMessages.ReadData> SERIALIZABLE_CLASS = ShardTransactionMessages.ReadData.class;
+ private static final long serialVersionUID = 1L;
- public ReadData(final YangInstanceIdentifier path) {
- super(path);
+ public ReadData() {
}
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.ReadData.newBuilder()
- .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath())).build();
+ public ReadData(final YangInstanceIdentifier path, short version) {
+ super(path, version);
}
- public static ReadData fromSerializable(final Object serializable) {
- ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
- return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+ @Override
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.BORON_VERSION) {
+ return this;
+ } else {
+ return ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(getPath())).build();
+ }
}
@Override
returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
}
}
+
+ @Override
+ protected AbstractRead<Optional<NormalizedNode<?, ?>>> newInstance(short withVersion) {
+ return new ReadData(getPath(), withVersion);
+ }
+
+ public static ReadData fromSerializable(final Object serializable) {
+ if(serializable instanceof ReadData) {
+ return (ReadData)serializable;
+ } else {
+ ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData)serializable;
+ return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+ DataStoreVersions.LITHIUM_VERSION);
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof ReadData || message instanceof ShardTransactionMessages.ReadData;
+ }
}
return argThat(matcher);
}
- protected DataExists eqSerializedDataExists() {
- ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
- @Override
- public boolean matches(Object argument) {
- return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
- }
- };
-
- return argThat(matcher);
- }
-
protected DataExists eqDataExists() {
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
return argThat(matcher);
}
- protected ReadData eqSerializedReadData() {
- return eqSerializedReadData(TestModel.TEST_PATH);
- }
-
- protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
- ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
- @Override
- public boolean matches(Object argument) {
- return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- ReadData.fromSerializable(argument).getPath().equals(path);
- }
- };
-
- return argThat(matcher);
- }
-
protected ReadData eqReadData() {
return eqReadData(TestModel.TEST_PATH);
}
}
protected Future<DataExistsReply> dataExistsReply(boolean exists) {
- return Futures.successful(DataExistsReply.create(exists));
+ return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
}
protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
- doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
return actorRef;
}
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
- localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+ localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+ SettableFuture.<Optional<NormalizedNode<?,?>>>create());
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
public void testExists() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
- localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.<Boolean>create());
+ localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
+ SettableFuture.<Boolean>create());
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
limiter.acquire(permits);
int availablePermits = 0;
- limiter.onComplete(null, DataExistsReply.create(true));
+ limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
- limiter.onComplete(null, DataExistsReply.create(true));
+ limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, new IllegalArgumentException());
final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
- getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
+ getSystem().actorSelection(createReply.getTransactionPath()).tell(
+ new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
assertEquals("Read node", containerNode, readReply.getNormalizedNode());
"testNegativeReadWithReadOnlyTransactionClosed");
Future<Object> future = akka.pattern.Patterns.ask(subject,
- new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+ new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
subject.underlyingActor().getDOMStoreTransaction().abort();
- future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+ future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+ DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
"testNegativeReadWithReadWriteTransactionClosed");
Future<Object> future = akka.pattern.Patterns.ask(subject,
- new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+ new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
subject.underlyingActor().getDOMStoreTransaction().abort();
- future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY), 3000);
+ future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
+ DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
"testNegativeExistsWithReadWriteTransactionClosed");
Future<Object> future = akka.pattern.Patterns.ask(subject,
- new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+ new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
subject.underlyingActor().getDOMStoreTransaction().abort();
- future = akka.pattern.Patterns.ask(subject, new DataExists(YangInstanceIdentifier.EMPTY), 3000);
+ future = akka.pattern.Patterns.ask(subject,
+ new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
}
}
private void testOnReceiveReadData(final ActorRef transaction) {
- //serialized read
- transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
- assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
-
- // unserialized read
- transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
+ DataStoreVersions.CURRENT_VERSION),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
}
private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
- // serialized read
- transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
-
- Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
-
- assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
-
- // unserialized read
- transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
}
private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
- transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
- assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
-
- // unserialized read
- transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
+ DataStoreVersions.CURRENT_VERSION),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
}
private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
- transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
-
- Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
- assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
-
- // unserialized read
- transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
+ transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
}};
}
+ @Test
+ public void testOnReceivePreBoronReadData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+ "testOnReceivePreBoronReadData");
+
+ transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+ toSerializable(), getRef());
+
+ Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+ }};
+ }
+
+ @Test
+ public void testOnReceivePreBoronDataExists() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+ "testOnReceivePreBoronDataExists");
+
+ transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+ toSerializable(), getRef());
+
+ Object replySerialized = expectMsgClass(duration("5 seconds"),
+ ShardTransactionMessages.DataExistsReply.class);
+ assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
+ }};
+ }
+
public static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
expectBatchedModifications(actorRef, 1);
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
}
@Test(expected=IllegalStateException.class)
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
expectBatchedModifications(actorRef, 1);
doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
}
@Test(expected=IllegalStateException.class)
eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
expectBatchedModificationsReady(actorRef);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
expectBatchedModifications(actorRef, 1);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
expectBatchedModificationsReady(actorRef, true);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
expectBatchedModificationsReady(actorRef, true);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
eq(actorSelection(actorRef)), isA(CloseTransaction.class));
}
-
- /**
- * Method to test a local Tx actor. The Tx paths are matched to decide if the
- * Tx actor is local or not. This is done by mocking the Tx actor path
- * and the caller paths and ensuring that the paths have the remote-address format
- *
- * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
- * the paths returned for the actors for all the tests are not qualified remote paths.
- * Hence are treated as non-local/remote actors. In short, all tests except
- * few below run for remote actors
- *
- * @throws Exception
- */
- @Test
- public void testLocalTxActorRead() throws Exception {
- setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- // negative test case with null as the reply
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqReadData());
-
- Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
- TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
- assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
-
- // test case with node as read data reply
- NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqReadData());
-
- readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-
- assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
- assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
-
- // test for local data exists
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDataExists());
-
- boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
-
- assertEquals("Exists response", true, exists);
- }
-
- @Test
- public void testLocalTxActorReady() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
- doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
- }
-
private static interface TransactionProxyOperation {
void run(TransactionProxy transactionProxy);
}
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
-
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
long start = System.nanoTime();
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
- doReturn(true).when(mockActorContext).isPathLocal(anyString());
-
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
long start = System.nanoTime();
YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+ eq(actorSelection(actorRef)), eqReadData(writePath2));
doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+ eq(actorSelection(actorRef)), eqReadData(mergePath2));
doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+ eq(actorSelection(actorRef)), eqReadData(writePath2));
inOrder.verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+ eq(actorSelection(actorRef)), eqReadData(mergePath2));
inOrder.verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
+ eq(actorSelection(actorRef)), eqDataExists());
}
@Test
doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
- doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
-
ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(txActorRef.path())).
eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
+ eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()));
}
}
*/
package org.opendaylight.controller.cluster.datastore.compat;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.util.Timeout;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
import org.opendaylight.controller.cluster.datastore.TransactionType;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
/**
* TransactionProxy unit tests for backwards compatibility with pre-Boron versions.
*
* @author Thomas Pantelis
*/
+@SuppressWarnings("resource")
public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
private CreateTransaction eqLegacyCreateTransaction(final TransactionType type) {
return argThat(matcher);
}
- private CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef, int transactionVersion){
- return CreateTransactionReply.newBuilder()
+ private ShardTransactionMessages.CreateTransactionReply legacyCreateTransactionReply(ActorRef actorRef,
+ int transactionVersion){
+ return ShardTransactionMessages.CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
.setTransactionId("txn-1")
.setMessageVersion(transactionVersion)
.build();
}
+ private ReadData eqLegacySerializedReadData(final YangInstanceIdentifier path) {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ShardTransactionMessages.ReadData.class.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(path);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DataExists eqLegacySerializedDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ShardTransactionMessages.DataExists.class.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
private ActorRef setupPreBoronActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
TransactionType type) {
ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem,
}
return txActorRef;
-
}
@Test
public void testClose() throws Exception{
ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
+ expectBatchedModifications(actorRef, 1);
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
- transactionProxy.read(TestModel.TEST_PATH);
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
transactionProxy.close();
verify(mockActorContext).sendOperationAsync(
eq(actorSelection(actorRef)), isA(ShardTransactionMessages.CloseTransaction.class));
}
+
+ @Test
+ public void testRead() throws Exception{
+ ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+ }
+
+ @Test
+ public void testExists() throws Exception{
+ ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqLegacySerializedDataExists());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ assertEquals("Exists response", true, exists);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExistsReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsReplyTest {
+
+ @Test
+ public void testSerialization() {
+ DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", DataExistsReply.class, serialized.getClass());
+
+ DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("exists", expected.exists(), actual.exists());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ DataExistsReply expected = new DataExistsReply(true, DataStoreVersions.LITHIUM_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.DataExistsReply.class, serialized.getClass());
+
+ DataExistsReply actual = DataExistsReply.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("exists", expected.exists(), actual.exists());
+ assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(
+ ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build()));
+
+ assertEquals("isSerializedType", true, DataExistsReply.isSerializedType(new DataExistsReply()));
+ assertEquals("isSerializedType", false, DataExistsReply.isSerializedType(new Object()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for DataExists.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataExistsTest {
+
+ @Test
+ public void testSerialization() {
+ DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", DataExists.class, serialized.getClass());
+
+ DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ DataExists expected = new DataExists(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.DataExists.class, serialized.getClass());
+
+ DataExists actual = DataExists.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, DataExists.isSerializedType(
+ ShardTransactionMessages.DataExists.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+ assertEquals("isSerializedType", true, DataExists.isSerializedType(new DataExists()));
+ assertEquals("isSerializedType", false, DataExists.isSerializedType(new Object()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadData.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataTest {
+
+ @Test
+ public void testSerialization() {
+ ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ReadData.class, serialized.getClass());
+
+ ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ ReadData expected = new ReadData(TestModel.TEST_PATH, DataStoreVersions.LITHIUM_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.ReadData.class, serialized.getClass());
+
+ ReadData actual = ReadData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getVersion", DataStoreVersions.LITHIUM_VERSION, actual.getVersion());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, ReadData.isSerializedType(
+ ShardTransactionMessages.ReadData.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(TestModel.TEST_PATH)).build()));
+
+ assertEquals("isSerializedType", true, ReadData.isSerializedType(new ReadData()));
+ assertEquals("isSerializedType", false, ReadData.isSerializedType(new Object()));
+ }
+}