package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.google.common.util.concurrent.CheckedFuture;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
public class TransactionProxyTest {
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
@Override
public boolean matches(Object argument) {
- CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName) &&
- obj.getTransactionType() == type.ordinal();
+ if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+
+ return false;
}
};
}
private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
- if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+ WriteData obj = WriteData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- WriteData obj = WriteData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
}
private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
- if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+ MergeData obj = MergeData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- MergeData obj = MergeData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
return Futures.successful((Object)new ReadyTransactionReply(path));
}
+ private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+ short transactionVersion) {
+ return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+ }
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+ return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
}
private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data));
+ return Futures.successful(new ReadDataReply(data));
}
private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists));
}
+ private Future<Object> writeSerializedDataReply(short version) {
+ return Futures.successful(new WriteDataReply().toSerializable(version));
+ }
+
private Future<Object> writeSerializedDataReply() {
- return Futures.successful(new WriteDataReply().toSerializable());
+ return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
private Future<WriteDataReply> writeDataReply() {
return Futures.successful(new WriteDataReply());
}
+ private Future<Object> mergeSerializedDataReply(short version) {
+ return Futures.successful(new MergeDataReply().toSerializable(version));
+ }
+
private Future<Object> mergeSerializedDataReply() {
- return Futures.successful(new MergeDataReply().toSerializable());
+ return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
private Future<MergeDataReply> mergeDataReply() {
.build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+ TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
- doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
return actorRef;
}
private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
}
}
@Test(expected=IllegalStateException.class)
- public void testxistsPreConditionCheck() {
+ public void testExistsPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
}
@Test(expected=IllegalStateException.class)
eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
- @Test
- public void testReadyForwardCompatibility() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+ private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+ READ_WRITE, version);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+ doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
- transactionProxy.read(TestModel.TEST_PATH);
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+ get(5, TimeUnit.SECONDS);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ transactionProxy.merge(TestModel.TEST_PATH, testNode);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ return actorRef;
+ }
+
+ @Test
+ public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+ @Test
+ public void testCompatibilityWithHeliumR1Version() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+ verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
verifyCohortFutures(proxy, TestException.class);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+ MergeDataReply.class, TestException.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
verifyCohortFutures(proxy, TestException.class);
}