package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import akka.actor.ActorPath;
+import com.google.common.util.concurrent.CheckedFuture;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
-
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
return argThat(matcher);
}
- private Future<Object> readyTxReply(ActorPath path) {
+ private Future<Object> readyTxReply(String path) {
return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
doReturn(getSystem().actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
+
+ doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
- executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+ executeOperation(eq(getSystem().actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
- doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
- anyString(), eq(actorRef.path().toString()));
- doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
-
return actorRef;
}
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
throws Throwable {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
- doThrow(exToThrow).when(mockActorContext).executeShardOperation(
- anyString(), any());
+ if (exToThrow instanceof PrimaryNotFoundException) {
+ doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+ } else {
+ doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShard(anyString());
+ }
+ doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
try {
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
- verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ verify(mockActorContext, times(0)).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
}
}
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(expectedNode));
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
- doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
- doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
try {
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
- verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ verify(mockActorContext, times(0)).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
DeleteDataReply.SERIALIZABLE_CLASS);
}
- private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
- Object... expReplies) throws Exception {
+ private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
assertEquals("getReadyOperationFutures size", expReplies.length,
- proxy.getCohortPathFutures().size());
+ proxy.getCohortFutures().size());
int i = 0;
- for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ for( Future<ActorSelection> future: proxy.getCohortFutures()) {
assertNotNull("Ready operation Future is null", future);
Object expReply = expReplies[i++];
- if(expReply instanceof ActorPath) {
- ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- assertEquals("Cohort actor path", expReply, actual);
+ if(expReply instanceof ActorSelection) {
+ ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
} else {
// Expecting exception.
try {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, actorRef.path());
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
@SuppressWarnings("unchecked")
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@SuppressWarnings("unchecked")
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@Test
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
- doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
- anyString(), any());
+ doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+// anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, PrimaryNotFoundException.class);
}
@SuppressWarnings("unchecked")
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, IllegalArgumentException.class);
+ verifyCohortFutures(proxy, IllegalArgumentException.class);
}
@Test
public void testClose() throws Exception{
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.close();
- verify(mockActorContext).sendRemoteOperationAsync(
+ verify(mockActorContext).sendOperationAsync(
eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
}
}