2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
15 import akka.actor.ActorSystem;
16 import akka.actor.Status.Failure;
17 import akka.actor.Status.Success;
18 import akka.dispatch.Futures;
19 import akka.pattern.AskTimeoutException;
20 import akka.testkit.TestProbe;
21 import akka.testkit.javadsl.TestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.net.InetSocketAddress;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.junit.AfterClass;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.ReadFailedException;
34 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
35 import org.opendaylight.netconf.api.DocumentedException;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
46 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
47 import org.opendaylight.yangtools.yang.common.QName;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
52 import scala.concurrent.Promise;
53 import scala.concurrent.duration.Duration;
54 import scala.concurrent.duration.FiniteDuration;
56 public class ProxyReadWriteTransactionTest {
57 private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
58 private static final RemoteDeviceId DEVICE_ID =
59 new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
60 private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
61 private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
63 private static ActorSystem system = ActorSystem.apply();
64 private TestProbe masterActor;
65 private ContainerNode node;
69 masterActor = new TestProbe(system);
70 node = Builders.containerBuilder()
71 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
76 public static void staticTearDown() {
77 TestKit.shutdownActorSystem(system, true);
80 private ProxyReadWriteTransaction newSuccessfulProxyTx() {
81 return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
84 private ProxyReadWriteTransaction newSuccessfulProxyTx(final Timeout timeout) {
85 return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
86 system.dispatcher(), timeout);
90 public void testCancel() {
91 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
94 masterActor.expectMsgClass(CancelRequest.class);
95 masterActor.reply(Boolean.TRUE);
99 public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
100 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
105 public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
106 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
108 assertFalse(tx.cancel());
112 public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
113 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
118 fail("Should throw IllegalStateException");
119 } catch (final IllegalStateException e) {
120 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
125 public void testDelete() {
126 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
128 tx.delete(STORE, PATH);
129 final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
130 assertEquals(STORE, deleteRequest.getStore());
131 assertEquals(PATH, deleteRequest.getPath());
135 public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
136 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
140 tx.delete(STORE, PATH);
141 fail("Should throw IllegalStateException");
142 } catch (final IllegalStateException e) {
143 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
148 public void testPut() {
149 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
151 tx.put(STORE, PATH, node);
152 final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
153 assertEquals(STORE, putRequest.getStore());
154 assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
155 assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
159 public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
160 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
164 tx.put(STORE, PATH, node);
165 fail("Should throw IllegalStateException");
166 } catch (final IllegalStateException e) {
167 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
172 public void testMerge() {
173 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
175 tx.merge(STORE, PATH, node);
176 final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
177 assertEquals(STORE, mergeRequest.getStore());
178 assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
179 assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
183 public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
184 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
188 tx.merge(STORE, PATH, node);
189 fail("Should throw IllegalStateException");
190 } catch (final IllegalStateException e) {
191 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
195 private void commit(final ProxyReadWriteTransaction tx)
196 throws InterruptedException, ExecutionException, TimeoutException {
197 final ListenableFuture<?> submit = tx.commit();
198 masterActor.expectMsgClass(SubmitRequest.class);
199 masterActor.reply(new Success(null));
200 submit.get(5, TimeUnit.SECONDS);
204 public void testRead() throws Exception {
205 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
207 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
208 final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
209 assertEquals(STORE, readRequest.getStore());
210 assertEquals(PATH, readRequest.getPath());
212 masterActor.reply(new NormalizedNodeMessage(PATH, node));
213 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
214 assertTrue(result.isPresent());
215 assertEquals(node, result.get());
219 public void testReadEmpty() throws Exception {
220 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
222 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
223 masterActor.expectMsgClass(ReadRequest.class);
224 masterActor.reply(new EmptyReadResponse());
225 final Optional<NormalizedNode> result = read.get(5, TimeUnit.SECONDS);
226 assertFalse(result.isPresent());
230 public void testReadFailure() throws InterruptedException, TimeoutException {
231 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
233 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
234 masterActor.expectMsgClass(ReadRequest.class);
235 final RuntimeException mockEx = new RuntimeException("fail");
236 masterActor.reply(new Failure(mockEx));
239 read.get(5, TimeUnit.SECONDS);
240 fail("Exception should be thrown");
241 } catch (final ExecutionException e) {
242 Throwable cause = e.getCause();
243 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
244 assertEquals(mockEx, cause.getCause());
249 public void testExists() throws Exception {
250 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
252 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
253 final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
254 assertEquals(STORE, existsRequest.getStore());
255 assertEquals(PATH, existsRequest.getPath());
257 masterActor.reply(Boolean.TRUE);
258 final Boolean result = read.get(5, TimeUnit.SECONDS);
263 public void testExistsFailure() throws InterruptedException, TimeoutException {
264 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
266 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
267 masterActor.expectMsgClass(ExistsRequest.class);
268 final RuntimeException mockEx = new RuntimeException("fail");
269 masterActor.reply(new Failure(mockEx));
272 read.get(5, TimeUnit.SECONDS);
273 fail("Exception should be thrown");
274 } catch (final ExecutionException e) {
275 Throwable cause = e.getCause();
276 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
277 assertEquals(mockEx, cause.getCause());
282 public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
283 ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
285 ListenableFuture<?> future = tx.read(STORE, PATH);
286 masterActor.expectMsgClass(ReadRequest.class);
288 // master doesn't reply
290 future.get(5, TimeUnit.SECONDS);
291 fail("Exception should be thrown");
292 } catch (final ExecutionException e) {
293 Throwable cause = e.getCause();
294 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
295 verifyDocumentedException(cause.getCause());
298 future = tx.exists(STORE, PATH);
299 masterActor.expectMsgClass(ExistsRequest.class);
301 // master doesn't reply
303 future.get(5, TimeUnit.SECONDS);
304 fail("Exception should be thrown");
305 } catch (final ExecutionException e) {
306 Throwable cause = e.getCause();
307 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
308 verifyDocumentedException(cause.getCause());
311 future = tx.commit();
312 masterActor.expectMsgClass(SubmitRequest.class);
314 // master doesn't reply
316 future.get(5, TimeUnit.SECONDS);
317 fail("Exception should be thrown");
318 } catch (final ExecutionException e) {
319 Throwable cause = e.getCause();
320 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
321 verifyDocumentedException(cause.getCause());
326 public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
327 final Promise<Object> promise = Futures.promise();
328 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
329 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
331 final ListenableFuture<Optional<NormalizedNode>> read = tx.read(STORE, PATH);
332 final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
334 tx.put(STORE, PATH, node);
335 tx.merge(STORE, PATH, node);
336 tx.delete(STORE, PATH);
338 final ListenableFuture<?> commit = tx.commit();
340 promise.success(masterActor.ref());
342 masterActor.expectMsgClass(ReadRequest.class);
343 masterActor.reply(new NormalizedNodeMessage(PATH, node));
345 masterActor.expectMsgClass(ExistsRequest.class);
346 masterActor.reply(Boolean.TRUE);
348 masterActor.expectMsgClass(PutRequest.class);
349 masterActor.expectMsgClass(MergeRequest.class);
350 masterActor.expectMsgClass(DeleteRequest.class);
352 masterActor.expectMsgClass(SubmitRequest.class);
353 masterActor.reply(new Success(null));
355 read.get(5, TimeUnit.SECONDS).isPresent();
356 assertTrue(exists.get(5, TimeUnit.SECONDS));
357 commit.get(5, TimeUnit.SECONDS);
361 public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
362 final AskTimeoutException mockEx = new AskTimeoutException("mock");
363 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
364 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
366 ListenableFuture<?> future = tx.read(STORE, PATH);
368 future.get(5, TimeUnit.SECONDS);
369 fail("Exception should be thrown");
370 } catch (final ExecutionException e) {
371 Throwable cause = e.getCause();
372 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
373 assertEquals(mockEx, cause.getCause());
376 future = tx.exists(STORE, PATH);
378 future.get(5, TimeUnit.SECONDS);
379 fail("Exception should be thrown");
380 } catch (final ExecutionException e) {
381 Throwable cause = e.getCause();
382 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
383 assertEquals(mockEx, cause.getCause());
386 tx.put(STORE, PATH, node);
387 tx.merge(STORE, PATH, node);
388 tx.delete(STORE, PATH);
390 future = tx.commit();
392 future.get(5, TimeUnit.SECONDS);
393 fail("Exception should be thrown");
394 } catch (final ExecutionException e) {
395 Throwable cause = e.getCause();
396 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
397 assertEquals(mockEx, cause.getCause());
401 private static void verifyDocumentedException(final Throwable cause) {
402 assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
403 final DocumentedException de = (DocumentedException) cause;
404 assertEquals(ErrorSeverity.WARNING, de.getErrorSeverity());
405 assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
406 assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());