Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContextTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.mock;
14
15 import akka.actor.ActorRef;
16 import akka.actor.Status.Failure;
17 import akka.dispatch.ExecutionContexts;
18 import akka.dispatch.OnComplete;
19 import akka.testkit.javadsl.TestKit;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mockito;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.MemberName;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.FiniteDuration;
41
42 /**
43  * Test whether RmoteTransactionContext operates correctly.
44  */
45 public class RemoteTransactionContextTest extends AbstractActorTest {
46     private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
47         ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
48         0), 0);
49
50     private OperationLimiter limiter;
51     private RemoteTransactionContext txContext;
52     private ActorUtils actorUtils;
53     private TestKit kit;
54
55     @Before
56     public void before() {
57         kit = new TestKit(getSystem());
58         actorUtils = Mockito.spy(new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
59             mock(Configuration.class)));
60         limiter = new OperationLimiter(TX_ID, 4, 0);
61         txContext = new RemoteTransactionContext(TX_ID, actorUtils.actorSelection(kit.getRef().path()), actorUtils,
62             DataStoreVersions.CURRENT_VERSION, limiter);
63         txContext.operationHandOffComplete();
64     }
65
66     /**
67      * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
68      * need to complete immediately with the failure and modifications should not be throttled and thrown away
69      * immediately.
70      */
71     @Test
72     public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
73         txContext.executeDelete(null, null);
74         txContext.executeDelete(null, null);
75         assertEquals(2, limiter.availablePermits());
76
77         final Future<Object> sendFuture = txContext.sendBatchedModifications();
78         assertEquals(2, limiter.availablePermits());
79
80         BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
81         assertEquals(2, msg.getModifications().size());
82         assertEquals(1, msg.getTotalMessagesSent());
83         sendReply(new Failure(new NullPointerException()));
84         assertFuture(sendFuture, new OnComplete<>() {
85             @Override
86             public void onComplete(final Throwable failure, final Object success) {
87                 assertTrue(failure instanceof NullPointerException);
88                 assertEquals(4, limiter.availablePermits());
89
90                 // The transaction has failed, no throttling should occur
91                 txContext.executeDelete(null, null);
92                 assertEquals(4, limiter.availablePermits());
93
94                 // Executing a read should result in immediate failure
95                 final SettableFuture<Boolean> readFuture = SettableFuture.create();
96                 txContext.executeRead(new DataExists(), readFuture, null);
97                 assertTrue(readFuture.isDone());
98                 try {
99                     readFuture.get();
100                     fail("Read future did not fail");
101                 } catch (ExecutionException | InterruptedException e) {
102                     assertTrue(e.getCause() instanceof NullPointerException);
103                 }
104             }
105         });
106
107         final Future<Object> commitFuture = txContext.directCommit(null);
108
109         msg = kit.expectMsgClass(BatchedModifications.class);
110         // Modification should have been thrown away by the dropped transmit induced by executeRead()
111         assertEquals(0, msg.getModifications().size());
112         assertTrue(msg.isDoCommitOnReady());
113         assertTrue(msg.isReady());
114         assertEquals(2, msg.getTotalMessagesSent());
115         sendReply(new Failure(new IllegalStateException()));
116         assertFuture(commitFuture, new OnComplete<>() {
117             @Override
118             public void onComplete(final Throwable failure, final Object success) {
119                 assertTrue(failure instanceof IllegalStateException);
120             }
121         });
122
123         kit.expectNoMessage();
124     }
125
126     /**
127      * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
128      * case, too.
129      */
130     @Test
131     public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
132         txContext.executeDelete(null, null);
133         txContext.executeDelete(null, null);
134         txContext.executeDelete(null, null);
135         txContext.executeDelete(null, null);
136         assertEquals(0, limiter.availablePermits());
137         txContext.executeDelete(null, null);
138         // Last acquire should have failed ...
139         assertEquals(0, limiter.availablePermits());
140
141         final Future<Object> future = txContext.sendBatchedModifications();
142         assertEquals(0, limiter.availablePermits());
143
144         BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
145         // ... so we are sending 5 modifications ...
146         assertEquals(5, msg.getModifications().size());
147         assertEquals(1, msg.getTotalMessagesSent());
148         sendReply(new Failure(new NullPointerException()));
149
150         assertFuture(future, new OnComplete<>() {
151             @Override
152             public void onComplete(final Throwable failure, final Object success) {
153                 assertTrue(failure instanceof NullPointerException);
154                 // ... but they account for only 4 permits.
155                 assertEquals(4, limiter.availablePermits());
156             }
157         });
158
159         kit.expectNoMessage();
160     }
161
162     private void sendReply(final Object message) {
163         final ActorRef askActor = kit.getLastSender();
164         kit.watch(askActor);
165         kit.reply(new Failure(new IllegalStateException()));
166         kit.expectTerminated(askActor);
167     }
168
169     private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
170             throws TimeoutException, InterruptedException {
171         Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
172         future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
173     }
174 }