import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.times;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
-public class TransactionProxyTest extends AbstractActorTest {
+public class TransactionProxyTest {
@SuppressWarnings("serial")
static class TestException extends RuntimeException {
CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
}
+ private static ActorSystem system;
+
private final Configuration configuration = new MockConfiguration();
@Mock
String memberName = "mock-member";
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+
+ Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
+ put("akka.actor.default-dispatcher.type",
+ "akka.testkit.CallingThreadDispatcherConfigurator").build()).
+ withFallback(ConfigFactory.load());
+ system = ActorSystem.create("test", config);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
schemaContext = TestModel.createTestContext();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+ doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+ doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
ShardStrategyFactory.setConfiguration(configuration);
}
+ private ActorSystem getSystem() {
+ return system;
+ }
+
private CreateTransaction eqCreateTransaction(final String memberName,
final TransactionType type) {
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
return getSystem().actorSelection(actorRef.path());
}
- private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1").build();
+ .setTransactionId("txn-1")
+ .setMessageVersion(transactionVersion)
+ .build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- doReturn(createTransactionReply(actorRef)).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(actorRef.path())),
+ doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
return actorRef;
}
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ }
+
+
private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
throws Throwable {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
if (exToThrow instanceof PrimaryNotFoundException) {
- doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+ doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
} else {
- doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
- when(mockActorContext).findPrimaryShard(anyString());
+ doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(anyString());
}
- doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
+
+ doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
transactionProxy.read(TestModel.TEST_PATH);
}
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidCreateTransactionReply() throws Throwable {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
+ actorSelection(actorRef.path().toString());
+
+ doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
+ eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+ }
+
@Test
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyForwardCompatibility() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ verifyCohortFutures(proxy, TestException.class);
+
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
-
- verifyCohortFutures(proxy, TestException.class);
}
@SuppressWarnings("unchecked")
@Test
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
- doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
-// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-// anyString(), any());
+ doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
+ mockActorContext).findPrimaryShardAsync(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorPath)
.build();
- doReturn(createTransactionReply).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_ONLY));
doReturn(true).when(mockActorContext).isLocalPath(actorPath);
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorPath)
.build();
- doReturn(createTransactionReply).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, WRITE_ONLY));
doReturn(true).when(mockActorContext).isLocalPath(actorPath);