2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
15 import akka.actor.ActorSystem;
16 import akka.actor.Status.Failure;
17 import akka.actor.Status.Success;
18 import akka.dispatch.Futures;
19 import akka.pattern.AskTimeoutException;
20 import akka.testkit.TestProbe;
21 import akka.testkit.javadsl.TestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.net.InetSocketAddress;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.junit.AfterClass;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.ReadFailedException;
34 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
35 import org.opendaylight.netconf.api.DocumentedException;
36 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
46 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
47 import org.opendaylight.yangtools.yang.common.ErrorTag;
48 import org.opendaylight.yangtools.yang.common.ErrorType;
49 import org.opendaylight.yangtools.yang.common.QName;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import scala.concurrent.Promise;
55 import scala.concurrent.duration.Duration;
56 import scala.concurrent.duration.FiniteDuration;
58 public class ProxyReadWriteTransactionTest {
59 private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
60 private static final RemoteDeviceId DEVICE_ID =
61 new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
62 private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
63 private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
65 private static ActorSystem system = ActorSystem.apply();
66 private TestProbe masterActor;
67 private ContainerNode node;
71 masterActor = new TestProbe(system);
72 node = Builders.containerBuilder()
73 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
78 public static void staticTearDown() {
79 TestKit.shutdownActorSystem(system, true);
82 private ProxyReadWriteTransaction newSuccessfulProxyTx() {
83 return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
86 private ProxyReadWriteTransaction newSuccessfulProxyTx(final Timeout timeout) {
87 return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
88 system.dispatcher(), timeout);
92 public void testCancel() {
93 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
96 masterActor.expectMsgClass(CancelRequest.class);
97 masterActor.reply(Boolean.TRUE);
101 public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
102 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
107 public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
108 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
110 assertFalse(tx.cancel());
114 public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
115 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
120 fail("Should throw IllegalStateException");
121 } catch (final IllegalStateException e) {
122 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
127 public void testDelete() {
128 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
130 tx.delete(STORE, PATH);
131 final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
132 assertEquals(STORE, deleteRequest.getStore());
133 assertEquals(PATH, deleteRequest.getPath());
137 public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
138 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
142 tx.delete(STORE, PATH);
143 fail("Should throw IllegalStateException");
144 } catch (final IllegalStateException e) {
145 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
150 public void testPut() {
151 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
153 tx.put(STORE, PATH, node);
154 final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
155 assertEquals(STORE, putRequest.getStore());
156 assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
157 assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
161 public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
162 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
166 tx.put(STORE, PATH, node);
167 fail("Should throw IllegalStateException");
168 } catch (final IllegalStateException e) {
169 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
174 public void testMerge() {
175 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
177 tx.merge(STORE, PATH, node);
178 final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
179 assertEquals(STORE, mergeRequest.getStore());
180 assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
181 assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
185 public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
186 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
190 tx.merge(STORE, PATH, node);
191 fail("Should throw IllegalStateException");
192 } catch (final IllegalStateException e) {
193 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
197 private void commit(final ProxyReadWriteTransaction tx)
198 throws InterruptedException, ExecutionException, TimeoutException {
199 final ListenableFuture<?> submit = tx.commit();
200 masterActor.expectMsgClass(SubmitRequest.class);
201 masterActor.reply(new Success(null));
202 submit.get(5, TimeUnit.SECONDS);
206 public void testRead() throws Exception {
207 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
209 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
210 final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
211 assertEquals(STORE, readRequest.getStore());
212 assertEquals(PATH, readRequest.getPath());
214 masterActor.reply(new NormalizedNodeMessage(PATH, node));
215 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
216 assertTrue(result.isPresent());
217 assertEquals(node, result.get());
221 public void testReadEmpty() throws Exception {
222 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
224 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
225 masterActor.expectMsgClass(ReadRequest.class);
226 masterActor.reply(new EmptyReadResponse());
227 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
228 assertFalse(result.isPresent());
232 public void testReadFailure() throws InterruptedException, TimeoutException {
233 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
235 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
236 masterActor.expectMsgClass(ReadRequest.class);
237 final RuntimeException mockEx = new RuntimeException("fail");
238 masterActor.reply(new Failure(mockEx));
241 read.get(5, TimeUnit.SECONDS);
242 fail("Exception should be thrown");
243 } catch (final ExecutionException e) {
244 Throwable cause = e.getCause();
245 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
246 assertEquals(mockEx, cause.getCause());
251 public void testExists() throws Exception {
252 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
254 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
255 final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
256 assertEquals(STORE, existsRequest.getStore());
257 assertEquals(PATH, existsRequest.getPath());
259 masterActor.reply(Boolean.TRUE);
260 final Boolean result = read.get(5, TimeUnit.SECONDS);
265 public void testExistsFailure() throws InterruptedException, TimeoutException {
266 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
268 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
269 masterActor.expectMsgClass(ExistsRequest.class);
270 final RuntimeException mockEx = new RuntimeException("fail");
271 masterActor.reply(new Failure(mockEx));
274 read.get(5, TimeUnit.SECONDS);
275 fail("Exception should be thrown");
276 } catch (final ExecutionException e) {
277 Throwable cause = e.getCause();
278 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
279 assertEquals(mockEx, cause.getCause());
284 public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
285 ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
287 ListenableFuture<?> future = tx.read(STORE, PATH);
288 masterActor.expectMsgClass(ReadRequest.class);
290 // master doesn't reply
292 future.get(5, TimeUnit.SECONDS);
293 fail("Exception should be thrown");
294 } catch (final ExecutionException e) {
295 Throwable cause = e.getCause();
296 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
297 verifyDocumentedException(cause.getCause());
300 future = tx.exists(STORE, PATH);
301 masterActor.expectMsgClass(ExistsRequest.class);
303 // master doesn't reply
305 future.get(5, TimeUnit.SECONDS);
306 fail("Exception should be thrown");
307 } catch (final ExecutionException e) {
308 Throwable cause = e.getCause();
309 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
310 verifyDocumentedException(cause.getCause());
313 future = tx.commit();
314 masterActor.expectMsgClass(SubmitRequest.class);
316 // master doesn't reply
318 future.get(5, TimeUnit.SECONDS);
319 fail("Exception should be thrown");
320 } catch (final ExecutionException e) {
321 Throwable cause = e.getCause();
322 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
323 verifyDocumentedException(cause.getCause());
328 public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
329 final Promise<Object> promise = Futures.promise();
330 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
331 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
333 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
334 final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
336 tx.put(STORE, PATH, node);
337 tx.merge(STORE, PATH, node);
338 tx.delete(STORE, PATH);
340 final ListenableFuture<?> commit = tx.commit();
342 promise.success(masterActor.ref());
344 masterActor.expectMsgClass(ReadRequest.class);
345 masterActor.reply(new NormalizedNodeMessage(PATH, node));
347 masterActor.expectMsgClass(ExistsRequest.class);
348 masterActor.reply(Boolean.TRUE);
350 masterActor.expectMsgClass(PutRequest.class);
351 masterActor.expectMsgClass(MergeRequest.class);
352 masterActor.expectMsgClass(DeleteRequest.class);
354 masterActor.expectMsgClass(SubmitRequest.class);
355 masterActor.reply(new Success(null));
357 read.get(5, TimeUnit.SECONDS).isPresent();
358 assertTrue(exists.get(5, TimeUnit.SECONDS));
359 commit.get(5, TimeUnit.SECONDS);
363 public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
364 final AskTimeoutException mockEx = new AskTimeoutException("mock");
365 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
366 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
368 ListenableFuture<?> future = tx.read(STORE, PATH);
370 future.get(5, TimeUnit.SECONDS);
371 fail("Exception should be thrown");
372 } catch (final ExecutionException e) {
373 Throwable cause = e.getCause();
374 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
375 assertEquals(mockEx, cause.getCause());
378 future = tx.exists(STORE, PATH);
380 future.get(5, TimeUnit.SECONDS);
381 fail("Exception should be thrown");
382 } catch (final ExecutionException e) {
383 Throwable cause = e.getCause();
384 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
385 assertEquals(mockEx, cause.getCause());
388 tx.put(STORE, PATH, node);
389 tx.merge(STORE, PATH, node);
390 tx.delete(STORE, PATH);
392 future = tx.commit();
394 future.get(5, TimeUnit.SECONDS);
395 fail("Exception should be thrown");
396 } catch (final ExecutionException e) {
397 Throwable cause = e.getCause();
398 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
399 assertEquals(mockEx, cause.getCause());
403 private static void verifyDocumentedException(final Throwable cause) {
404 assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
405 final DocumentedException de = (DocumentedException) cause;
406 assertEquals(ErrorSeverity.WARNING, de.getErrorSeverity());
407 assertEquals(ErrorTag.OPERATION_FAILED, de.getErrorTag());
408 assertEquals(ErrorType.APPLICATION, de.getErrorType());