2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
15 import akka.actor.ActorSystem;
16 import akka.actor.Status.Failure;
17 import akka.actor.Status.Success;
18 import akka.dispatch.Futures;
19 import akka.pattern.AskTimeoutException;
20 import akka.testkit.TestProbe;
21 import akka.testkit.javadsl.TestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.net.InetSocketAddress;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.junit.AfterClass;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.ReadFailedException;
34 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
35 import org.opendaylight.netconf.api.DocumentedException;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
46 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
47 import org.opendaylight.yangtools.yang.common.ErrorType;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
53 import scala.concurrent.Promise;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
57 public class ProxyReadWriteTransactionTest {
58 private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
59 private static final RemoteDeviceId DEVICE_ID =
60 new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
61 private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
62 private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
64 private static ActorSystem system = ActorSystem.apply();
65 private TestProbe masterActor;
66 private ContainerNode node;
70 masterActor = new TestProbe(system);
71 node = Builders.containerBuilder()
72 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
77 public static void staticTearDown() {
78 TestKit.shutdownActorSystem(system, true);
81 private ProxyReadWriteTransaction newSuccessfulProxyTx() {
82 return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
85 private ProxyReadWriteTransaction newSuccessfulProxyTx(final Timeout timeout) {
86 return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
87 system.dispatcher(), timeout);
91 public void testCancel() {
92 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
95 masterActor.expectMsgClass(CancelRequest.class);
96 masterActor.reply(Boolean.TRUE);
100 public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
101 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
106 public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
107 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
109 assertFalse(tx.cancel());
113 public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
114 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
119 fail("Should throw IllegalStateException");
120 } catch (final IllegalStateException e) {
121 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
126 public void testDelete() {
127 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
129 tx.delete(STORE, PATH);
130 final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
131 assertEquals(STORE, deleteRequest.getStore());
132 assertEquals(PATH, deleteRequest.getPath());
136 public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
137 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
141 tx.delete(STORE, PATH);
142 fail("Should throw IllegalStateException");
143 } catch (final IllegalStateException e) {
144 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
149 public void testPut() {
150 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
152 tx.put(STORE, PATH, node);
153 final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
154 assertEquals(STORE, putRequest.getStore());
155 assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
156 assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
160 public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
161 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
165 tx.put(STORE, PATH, node);
166 fail("Should throw IllegalStateException");
167 } catch (final IllegalStateException e) {
168 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
173 public void testMerge() {
174 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
176 tx.merge(STORE, PATH, node);
177 final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
178 assertEquals(STORE, mergeRequest.getStore());
179 assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
180 assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
184 public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
185 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
189 tx.merge(STORE, PATH, node);
190 fail("Should throw IllegalStateException");
191 } catch (final IllegalStateException e) {
192 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
196 private void commit(final ProxyReadWriteTransaction tx)
197 throws InterruptedException, ExecutionException, TimeoutException {
198 final ListenableFuture<?> submit = tx.commit();
199 masterActor.expectMsgClass(SubmitRequest.class);
200 masterActor.reply(new Success(null));
201 submit.get(5, TimeUnit.SECONDS);
205 public void testRead() throws Exception {
206 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
208 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
209 final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
210 assertEquals(STORE, readRequest.getStore());
211 assertEquals(PATH, readRequest.getPath());
213 masterActor.reply(new NormalizedNodeMessage(PATH, node));
214 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
215 assertTrue(result.isPresent());
216 assertEquals(node, result.get());
220 public void testReadEmpty() throws Exception {
221 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
223 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
224 masterActor.expectMsgClass(ReadRequest.class);
225 masterActor.reply(new EmptyReadResponse());
226 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
227 assertFalse(result.isPresent());
231 public void testReadFailure() throws InterruptedException, TimeoutException {
232 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
234 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
235 masterActor.expectMsgClass(ReadRequest.class);
236 final RuntimeException mockEx = new RuntimeException("fail");
237 masterActor.reply(new Failure(mockEx));
240 read.get(5, TimeUnit.SECONDS);
241 fail("Exception should be thrown");
242 } catch (final ExecutionException e) {
243 Throwable cause = e.getCause();
244 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
245 assertEquals(mockEx, cause.getCause());
250 public void testExists() throws Exception {
251 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
253 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
254 final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
255 assertEquals(STORE, existsRequest.getStore());
256 assertEquals(PATH, existsRequest.getPath());
258 masterActor.reply(Boolean.TRUE);
259 final Boolean result = read.get(5, TimeUnit.SECONDS);
264 public void testExistsFailure() throws InterruptedException, TimeoutException {
265 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
267 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
268 masterActor.expectMsgClass(ExistsRequest.class);
269 final RuntimeException mockEx = new RuntimeException("fail");
270 masterActor.reply(new Failure(mockEx));
273 read.get(5, TimeUnit.SECONDS);
274 fail("Exception should be thrown");
275 } catch (final ExecutionException e) {
276 Throwable cause = e.getCause();
277 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
278 assertEquals(mockEx, cause.getCause());
283 public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
284 ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
286 ListenableFuture<?> future = tx.read(STORE, PATH);
287 masterActor.expectMsgClass(ReadRequest.class);
289 // master doesn't reply
291 future.get(5, TimeUnit.SECONDS);
292 fail("Exception should be thrown");
293 } catch (final ExecutionException e) {
294 Throwable cause = e.getCause();
295 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
296 verifyDocumentedException(cause.getCause());
299 future = tx.exists(STORE, PATH);
300 masterActor.expectMsgClass(ExistsRequest.class);
302 // master doesn't reply
304 future.get(5, TimeUnit.SECONDS);
305 fail("Exception should be thrown");
306 } catch (final ExecutionException e) {
307 Throwable cause = e.getCause();
308 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
309 verifyDocumentedException(cause.getCause());
312 future = tx.commit();
313 masterActor.expectMsgClass(SubmitRequest.class);
315 // master doesn't reply
317 future.get(5, TimeUnit.SECONDS);
318 fail("Exception should be thrown");
319 } catch (final ExecutionException e) {
320 Throwable cause = e.getCause();
321 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
322 verifyDocumentedException(cause.getCause());
327 public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
328 final Promise<Object> promise = Futures.promise();
329 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
330 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
332 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
333 final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
335 tx.put(STORE, PATH, node);
336 tx.merge(STORE, PATH, node);
337 tx.delete(STORE, PATH);
339 final ListenableFuture<?> commit = tx.commit();
341 promise.success(masterActor.ref());
343 masterActor.expectMsgClass(ReadRequest.class);
344 masterActor.reply(new NormalizedNodeMessage(PATH, node));
346 masterActor.expectMsgClass(ExistsRequest.class);
347 masterActor.reply(Boolean.TRUE);
349 masterActor.expectMsgClass(PutRequest.class);
350 masterActor.expectMsgClass(MergeRequest.class);
351 masterActor.expectMsgClass(DeleteRequest.class);
353 masterActor.expectMsgClass(SubmitRequest.class);
354 masterActor.reply(new Success(null));
356 read.get(5, TimeUnit.SECONDS).isPresent();
357 assertTrue(exists.get(5, TimeUnit.SECONDS));
358 commit.get(5, TimeUnit.SECONDS);
362 public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
363 final AskTimeoutException mockEx = new AskTimeoutException("mock");
364 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
365 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
367 ListenableFuture<?> future = tx.read(STORE, PATH);
369 future.get(5, TimeUnit.SECONDS);
370 fail("Exception should be thrown");
371 } catch (final ExecutionException e) {
372 Throwable cause = e.getCause();
373 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
374 assertEquals(mockEx, cause.getCause());
377 future = tx.exists(STORE, PATH);
379 future.get(5, TimeUnit.SECONDS);
380 fail("Exception should be thrown");
381 } catch (final ExecutionException e) {
382 Throwable cause = e.getCause();
383 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
384 assertEquals(mockEx, cause.getCause());
387 tx.put(STORE, PATH, node);
388 tx.merge(STORE, PATH, node);
389 tx.delete(STORE, PATH);
391 future = tx.commit();
393 future.get(5, TimeUnit.SECONDS);
394 fail("Exception should be thrown");
395 } catch (final ExecutionException e) {
396 Throwable cause = e.getCause();
397 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
398 assertEquals(mockEx, cause.getCause());
402 private static void verifyDocumentedException(final Throwable cause) {
403 assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
404 final DocumentedException de = (DocumentedException) cause;
405 assertEquals(ErrorSeverity.WARNING, de.getErrorSeverity());
406 assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
407 assertEquals(ErrorType.APPLICATION, de.getErrorType());