* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl.tx;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.dispatch.Futures;
+import akka.pattern.AskTimeoutException;
import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import java.util.concurrent.TimeoutException;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.netconf.api.DocumentedException;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
public class ProxyReadWriteTransactionTest {
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
+ private static final RemoteDeviceId DEVICE_ID =
+ new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
- private ActorSystem system;
+ private static ActorSystem system = ActorSystem.apply();
private TestProbe masterActor;
private ContainerNode node;
- private ProxyReadWriteTransaction tx;
@Before
- public void setUp() throws Exception {
- system = ActorSystem.apply();
+ public void setUp() {
masterActor = new TestProbe(system);
- final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
.build();
- tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
}
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, true);
+ }
+
+ private ProxyReadWriteTransaction newSuccessfulProxyTx() {
+ return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
+ }
+
+ private ProxyReadWriteTransaction newSuccessfulProxyTx(final Timeout timeout) {
+ return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
+ system.dispatcher(), timeout);
}
@Test
- public void testCancel() throws Exception {
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ public void testCancel() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ tx.cancel();
masterActor.expectMsgClass(CancelRequest.class);
- masterActor.reply(true);
- Assert.assertTrue(submit.get());
+ masterActor.reply(Boolean.TRUE);
}
@Test
- public void testCancelSubmitted() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
- masterActor.expectNoMsg();
- Assert.assertFalse(submit.get());
+ public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+ commit(tx);
}
@Test
- public void testSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
+ public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+ commit(tx);
+ assertFalse(tx.cancel());
}
@Test
- public void testDoubleSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
+ public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
- tx.submit().checkedGet();
- Assert.fail("Should throw IllegalStateException");
+ tx.commit();
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testCommit() throws Exception {
- final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
- }
+ public void testDelete() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
- @Test
- public void testDelete() throws Exception {
tx.delete(STORE, PATH);
- masterActor.expectMsgClass(DeleteRequest.class);
+ final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
+ assertEquals(STORE, deleteRequest.getStore());
+ assertEquals(PATH, deleteRequest.getPath());
}
@Test
- public void testDeleteClosed() throws Exception {
- submit();
+ public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.delete(STORE, PATH);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testPut() throws Exception {
+ public void testPut() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
tx.put(STORE, PATH, node);
- masterActor.expectMsgClass(PutRequest.class);
+ final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
+ assertEquals(STORE, putRequest.getStore());
+ assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
+ assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
}
@Test
- public void testPutClosed() throws Exception {
- submit();
+ public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.put(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testMerge() throws Exception {
+ public void testMerge() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
tx.merge(STORE, PATH, node);
- masterActor.expectMsgClass(MergeRequest.class);
+ final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
+ assertEquals(STORE, mergeRequest.getStore());
+ assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
+ assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
}
@Test
- public void testMergeClosed() throws Exception {
- submit();
+ public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.merge(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
- @Test
- public void testGetIdentifier() throws Exception {
- Assert.assertEquals(tx, tx.getIdentifier());
- }
-
- private void submit() throws TransactionCommitFailedException {
- final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+ private void commit(final ProxyReadWriteTransaction tx)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final ListenableFuture<?> submit = tx.commit();
masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submit.checkedGet();
+ masterActor.reply(new Success(null));
+ submit.get(5, TimeUnit.SECONDS);
}
@Test
public void testRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
+ final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
+ assertEquals(STORE, readRequest.getStore());
+ assertEquals(PATH, readRequest.getPath());
+
masterActor.reply(new NormalizedNodeMessage(PATH, node));
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertTrue(result.isPresent());
- Assert.assertEquals(node, result.get());
+ final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
+ assertTrue(result.isPresent());
+ assertEquals(node, result.get());
}
@Test
public void testReadEmpty() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
masterActor.expectMsgClass(ReadRequest.class);
masterActor.reply(new EmptyReadResponse());
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertFalse(result.isPresent());
+ final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
+ assertFalse(result.isPresent());
}
- @Test(expected = ReadFailedException.class)
- public void testReadFail() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ @Test
+ public void testReadFailure() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
masterActor.expectMsgClass(ReadRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
+ final RuntimeException mockEx = new RuntimeException("fail");
+ masterActor.reply(new Failure(mockEx));
+
+ try {
+ read.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
}
@Test
public void testExists() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
+ final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
+ assertEquals(STORE, existsRequest.getStore());
+ assertEquals(PATH, existsRequest.getPath());
+
+ masterActor.reply(Boolean.TRUE);
+ final Boolean result = read.get(5, TimeUnit.SECONDS);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testExistsFailure() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(true);
- final Boolean result = read.checkedGet();
- Assert.assertTrue(result);
+ final RuntimeException mockEx = new RuntimeException("fail");
+ masterActor.reply(new Failure(mockEx));
+
+ try {
+ read.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
}
- @Test(expected = ReadFailedException.class)
- public void testExistsFail() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ @Test
+ public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
+
+ ListenableFuture<?> future = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
+
+ future = tx.exists(STORE, PATH);
masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
+
+ future = tx.commit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
}
@Test
- public void testMasterDownRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
+ final Promise<Object> promise = Futures.promise();
+ ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
+ system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+ final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
+ final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
+
+ tx.put(STORE, PATH, node);
+ tx.merge(STORE, PATH, node);
+ tx.delete(STORE, PATH);
+
+ final ListenableFuture<?> commit = tx.commit();
+
+ promise.success(masterActor.ref());
+
masterActor.expectMsgClass(ReadRequest.class);
- //master doesn't reply
+ masterActor.reply(new NormalizedNodeMessage(PATH, node));
+
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(Boolean.TRUE);
+
+ masterActor.expectMsgClass(PutRequest.class);
+ masterActor.expectMsgClass(MergeRequest.class);
+ masterActor.expectMsgClass(DeleteRequest.class);
+
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new Success(null));
+
+ read.get(5, TimeUnit.SECONDS).isPresent();
+ assertTrue(exists.get(5, TimeUnit.SECONDS));
+ commit.get(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
+ final AskTimeoutException mockEx = new AskTimeoutException("mock");
+ ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
+ system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+ ListenableFuture<?> future = tx.read(STORE, PATH);
try {
- read.checkedGet();
- Assert.fail("Exception should be thrown");
- } catch (final ReadFailedException e) {
- final Throwable cause = e.getCause();
- Assert.assertTrue(cause instanceof DocumentedException);
- final DocumentedException de = (DocumentedException) cause;
- Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
- Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
- Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
}
+
+ future = tx.exists(STORE, PATH);
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
+
+ tx.put(STORE, PATH, node);
+ tx.merge(STORE, PATH, node);
+ tx.delete(STORE, PATH);
+
+ future = tx.commit();
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
+ }
+
+ private static void verifyDocumentedException(final Throwable cause) {
+ assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
+ final DocumentedException de = (DocumentedException) cause;
+ assertEquals(ErrorSeverity.WARNING, de.getErrorSeverity());
+ assertEquals(ErrorTag.OPERATION_FAILED, de.getErrorTag());
+ assertEquals(ErrorType.APPLICATION, de.getErrorType());
}
-}
\ No newline at end of file
+}