2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
15 import akka.actor.ActorSystem;
16 import akka.actor.Status.Failure;
17 import akka.actor.Status.Success;
18 import akka.dispatch.Futures;
19 import akka.pattern.AskTimeoutException;
20 import akka.testkit.TestProbe;
21 import akka.testkit.javadsl.TestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.net.InetSocketAddress;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.junit.AfterClass;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.ReadFailedException;
34 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
35 import org.opendaylight.netconf.api.DocumentedException;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
46 import org.opendaylight.yangtools.yang.common.QName;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
51 import scala.concurrent.Promise;
52 import scala.concurrent.duration.Duration;
53 import scala.concurrent.duration.FiniteDuration;
55 public class ProxyReadWriteTransactionTest {
56 private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
57 private static final RemoteDeviceId DEVICE_ID =
58 new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
59 private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
60 private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
62 private static ActorSystem system = ActorSystem.apply();
63 private TestProbe masterActor;
64 private ContainerNode node;
68 masterActor = new TestProbe(system);
69 node = Builders.containerBuilder()
70 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
75 public static void staticTearDown() {
76 TestKit.shutdownActorSystem(system, true);
79 private ProxyReadWriteTransaction newSuccessfulProxyTx() {
80 return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
83 private ProxyReadWriteTransaction newSuccessfulProxyTx(final Timeout timeout) {
84 return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
85 system.dispatcher(), timeout);
89 public void testCancel() {
90 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
93 masterActor.expectMsgClass(CancelRequest.class);
94 masterActor.reply(Boolean.TRUE);
98 public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
99 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
104 public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
105 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
107 assertFalse(tx.cancel());
111 public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
112 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
117 fail("Should throw IllegalStateException");
118 } catch (final IllegalStateException e) {
119 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
124 public void testDelete() {
125 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
127 tx.delete(STORE, PATH);
128 final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
129 assertEquals(STORE, deleteRequest.getStore());
130 assertEquals(PATH, deleteRequest.getPath());
134 public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
135 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
139 tx.delete(STORE, PATH);
140 fail("Should throw IllegalStateException");
141 } catch (final IllegalStateException e) {
142 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
147 public void testPut() {
148 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
150 tx.put(STORE, PATH, node);
151 final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
152 assertEquals(STORE, putRequest.getStore());
153 assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
154 assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
158 public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
159 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
163 tx.put(STORE, PATH, node);
164 fail("Should throw IllegalStateException");
165 } catch (final IllegalStateException e) {
166 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
171 public void testMerge() {
172 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
174 tx.merge(STORE, PATH, node);
175 final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
176 assertEquals(STORE, mergeRequest.getStore());
177 assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
178 assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
182 public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
183 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
187 tx.merge(STORE, PATH, node);
188 fail("Should throw IllegalStateException");
189 } catch (final IllegalStateException e) {
190 masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
194 private void commit(final ProxyReadWriteTransaction tx)
195 throws InterruptedException, ExecutionException, TimeoutException {
196 final ListenableFuture<?> submit = tx.commit();
197 masterActor.expectMsgClass(SubmitRequest.class);
198 masterActor.reply(new Success(null));
199 submit.get(5, TimeUnit.SECONDS);
203 public void testRead() throws Exception {
204 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
206 final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
207 final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
208 assertEquals(STORE, readRequest.getStore());
209 assertEquals(PATH, readRequest.getPath());
211 masterActor.reply(new NormalizedNodeMessage(PATH, node));
212 final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
213 assertTrue(result.isPresent());
214 assertEquals(node, result.get());
218 public void testReadEmpty() throws Exception {
219 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
221 final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
222 masterActor.expectMsgClass(ReadRequest.class);
223 masterActor.reply(new EmptyReadResponse());
224 final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
225 assertFalse(result.isPresent());
229 public void testReadFailure() throws InterruptedException, TimeoutException {
230 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
232 final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
233 masterActor.expectMsgClass(ReadRequest.class);
234 final RuntimeException mockEx = new RuntimeException("fail");
235 masterActor.reply(new Failure(mockEx));
238 read.get(5, TimeUnit.SECONDS);
239 fail("Exception should be thrown");
240 } catch (final ExecutionException e) {
241 Throwable cause = e.getCause();
242 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
243 assertEquals(mockEx, cause.getCause());
248 public void testExists() throws Exception {
249 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
251 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
252 final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
253 assertEquals(STORE, existsRequest.getStore());
254 assertEquals(PATH, existsRequest.getPath());
256 masterActor.reply(Boolean.TRUE);
257 final Boolean result = read.get(5, TimeUnit.SECONDS);
262 public void testExistsFailure() throws InterruptedException, TimeoutException {
263 ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
265 final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
266 masterActor.expectMsgClass(ExistsRequest.class);
267 final RuntimeException mockEx = new RuntimeException("fail");
268 masterActor.reply(new Failure(mockEx));
271 read.get(5, TimeUnit.SECONDS);
272 fail("Exception should be thrown");
273 } catch (final ExecutionException e) {
274 Throwable cause = e.getCause();
275 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
276 assertEquals(mockEx, cause.getCause());
281 public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
282 ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
284 ListenableFuture<?> future = tx.read(STORE, PATH);
285 masterActor.expectMsgClass(ReadRequest.class);
287 // master doesn't reply
289 future.get(5, TimeUnit.SECONDS);
290 fail("Exception should be thrown");
291 } catch (final ExecutionException e) {
292 Throwable cause = e.getCause();
293 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
294 verifyDocumentedException(cause.getCause());
297 future = tx.exists(STORE, PATH);
298 masterActor.expectMsgClass(ExistsRequest.class);
300 // master doesn't reply
302 future.get(5, TimeUnit.SECONDS);
303 fail("Exception should be thrown");
304 } catch (final ExecutionException e) {
305 Throwable cause = e.getCause();
306 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
307 verifyDocumentedException(cause.getCause());
310 future = tx.commit();
311 masterActor.expectMsgClass(SubmitRequest.class);
313 // master doesn't reply
315 future.get(5, TimeUnit.SECONDS);
316 fail("Exception should be thrown");
317 } catch (final ExecutionException e) {
318 Throwable cause = e.getCause();
319 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
320 verifyDocumentedException(cause.getCause());
325 public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
326 final Promise<Object> promise = Futures.promise();
327 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
328 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
330 final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
331 final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
333 tx.put(STORE, PATH, node);
334 tx.merge(STORE, PATH, node);
335 tx.delete(STORE, PATH);
337 final ListenableFuture<?> commit = tx.commit();
339 promise.success(masterActor.ref());
341 masterActor.expectMsgClass(ReadRequest.class);
342 masterActor.reply(new NormalizedNodeMessage(PATH, node));
344 masterActor.expectMsgClass(ExistsRequest.class);
345 masterActor.reply(Boolean.TRUE);
347 masterActor.expectMsgClass(PutRequest.class);
348 masterActor.expectMsgClass(MergeRequest.class);
349 masterActor.expectMsgClass(DeleteRequest.class);
351 masterActor.expectMsgClass(SubmitRequest.class);
352 masterActor.reply(new Success(null));
354 read.get(5, TimeUnit.SECONDS).isPresent();
355 assertTrue(exists.get(5, TimeUnit.SECONDS));
356 commit.get(5, TimeUnit.SECONDS);
360 public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
361 final AskTimeoutException mockEx = new AskTimeoutException("mock");
362 ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
363 system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
365 ListenableFuture<?> future = tx.read(STORE, PATH);
367 future.get(5, TimeUnit.SECONDS);
368 fail("Exception should be thrown");
369 } catch (final ExecutionException e) {
370 Throwable cause = e.getCause();
371 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
372 assertEquals(mockEx, cause.getCause());
375 future = tx.exists(STORE, PATH);
377 future.get(5, TimeUnit.SECONDS);
378 fail("Exception should be thrown");
379 } catch (final ExecutionException e) {
380 Throwable cause = e.getCause();
381 assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
382 assertEquals(mockEx, cause.getCause());
385 tx.put(STORE, PATH, node);
386 tx.merge(STORE, PATH, node);
387 tx.delete(STORE, PATH);
389 future = tx.commit();
391 future.get(5, TimeUnit.SECONDS);
392 fail("Exception should be thrown");
393 } catch (final ExecutionException e) {
394 Throwable cause = e.getCause();
395 assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
396 assertEquals(mockEx, cause.getCause());
400 private static void verifyDocumentedException(final Throwable cause) {
401 assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
402 final DocumentedException de = (DocumentedException) cause;
403 assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
404 assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
405 assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());