2 * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.Status.Failure;
17 import akka.dispatch.ExecutionContexts;
18 import akka.dispatch.OnComplete;
19 import akka.testkit.javadsl.TestKit;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mockito;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.MemberName;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.FiniteDuration;
43 * Test whether RmoteTransactionContext operates correctly.
45 public class RemoteTransactionContextTest extends AbstractActorTest {
46 private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
47 ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
50 private OperationLimiter limiter;
51 private RemoteTransactionContext txContext;
52 private ActorUtils actorUtils;
56 public void before() {
57 kit = new TestKit(getSystem());
58 actorUtils = Mockito.spy(new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
59 mock(Configuration.class)));
60 limiter = new OperationLimiter(TX_ID, 4, 0);
61 txContext = new RemoteTransactionContext(TX_ID, actorUtils.actorSelection(kit.getRef().path()), actorUtils,
62 DataStoreVersions.CURRENT_VERSION, limiter);
63 txContext.operationHandOffComplete();
67 * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
68 * need to complete immediately with the failure and modifications should not be throttled and thrown away
72 public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
73 txContext.executeDelete(null, null);
74 txContext.executeDelete(null, null);
75 assertEquals(2, limiter.availablePermits());
77 final Future<Object> sendFuture = txContext.sendBatchedModifications();
78 assertEquals(2, limiter.availablePermits());
80 BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
81 assertEquals(2, msg.getModifications().size());
82 assertEquals(1, msg.getTotalMessagesSent());
83 sendReply(new Failure(new NullPointerException()));
84 assertFuture(sendFuture, new OnComplete<>() {
86 public void onComplete(final Throwable failure, final Object success) {
87 assertTrue(failure instanceof NullPointerException);
88 assertEquals(4, limiter.availablePermits());
90 // The transaction has failed, no throttling should occur
91 txContext.executeDelete(null, null);
92 assertEquals(4, limiter.availablePermits());
94 // Executing a read should result in immediate failure
95 final SettableFuture<Boolean> readFuture = SettableFuture.create();
96 txContext.executeRead(new DataExists(), readFuture, null);
97 assertTrue(readFuture.isDone());
100 fail("Read future did not fail");
101 } catch (ExecutionException | InterruptedException e) {
102 assertTrue(e.getCause() instanceof NullPointerException);
107 final Future<Object> commitFuture = txContext.directCommit(null);
109 msg = kit.expectMsgClass(BatchedModifications.class);
110 // Modification should have been thrown away by the dropped transmit induced by executeRead()
111 assertEquals(0, msg.getModifications().size());
112 assertTrue(msg.isDoCommitOnReady());
113 assertTrue(msg.isReady());
114 assertEquals(2, msg.getTotalMessagesSent());
115 sendReply(new Failure(new IllegalStateException()));
116 assertFuture(commitFuture, new OnComplete<>() {
118 public void onComplete(final Throwable failure, final Object success) {
119 assertTrue(failure instanceof IllegalStateException);
123 kit.expectNoMessage();
127 * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
131 public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
132 txContext.executeDelete(null, null);
133 txContext.executeDelete(null, null);
134 txContext.executeDelete(null, null);
135 txContext.executeDelete(null, null);
136 assertEquals(0, limiter.availablePermits());
137 txContext.executeDelete(null, null);
138 // Last acquire should have failed ...
139 assertEquals(0, limiter.availablePermits());
141 final Future<Object> future = txContext.sendBatchedModifications();
142 assertEquals(0, limiter.availablePermits());
144 BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
145 // ... so we are sending 5 modifications ...
146 assertEquals(5, msg.getModifications().size());
147 assertEquals(1, msg.getTotalMessagesSent());
148 sendReply(new Failure(new NullPointerException()));
150 assertFuture(future, new OnComplete<>() {
152 public void onComplete(final Throwable failure, final Object success) {
153 assertTrue(failure instanceof NullPointerException);
154 // ... but they account for only 4 permits.
155 assertEquals(4, limiter.availablePermits());
159 kit.expectNoMessage();
162 private void sendReply(final Object message) {
163 final ActorRef askActor = kit.getLastSender();
165 kit.reply(new Failure(new IllegalStateException()));
166 kit.expectTerminated(askActor);
169 private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
170 throws TimeoutException, InterruptedException {
171 Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
172 future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));