cd6e066b43165f519d854b0b6603e8a990d3cc80
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContextTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.mock;
14
15 import akka.actor.ActorRef;
16 import akka.actor.Status.Failure;
17 import akka.dispatch.ExecutionContexts;
18 import akka.dispatch.OnComplete;
19 import akka.testkit.javadsl.TestKit;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mockito;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.MemberName;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
42
43 /**
44  * Test whether RmoteTransactionContext operates correctly.
45  */
46 public class RemoteTransactionContextTest extends AbstractActorTest {
47     private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
48         ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
49         0), 0);
50     private static final DeleteModification DELETE = new DeleteModification(DataStoreVersions.CURRENT_VERSION);
51
52     private OperationLimiter limiter;
53     private RemoteTransactionContext txContext;
54     private ActorContext actorContext;
55     private TestKit kit;
56
57     @Before
58     public void before() {
59         kit = new TestKit(getSystem());
60         actorContext = Mockito.spy(new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
61             mock(Configuration.class)));
62         limiter = new OperationLimiter(TX_ID, 4, 0);
63         txContext = new RemoteTransactionContext(TX_ID, actorContext.actorSelection(kit.getRef().path()), actorContext,
64             DataStoreVersions.CURRENT_VERSION, limiter);
65         txContext.operationHandOffComplete();
66     }
67
68     /**
69      * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
70      * need to complete immediately with the failure and modifications should not be throttled and thrown away
71      * immediately.
72      */
73     @Test
74     public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
75         txContext.executeModification(DELETE, null);
76         txContext.executeModification(DELETE, null);
77         assertEquals(2, limiter.availablePermits());
78
79         Future<Object> future = txContext.sendBatchedModifications();
80         assertEquals(2, limiter.availablePermits());
81
82         BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
83         assertEquals(2, msg.getModifications().size());
84         assertEquals(1, msg.getTotalMessagesSent());
85         sendReply(new Failure(new NullPointerException()));
86         assertFuture(future, new OnComplete<Object>() {
87             @Override
88             public void onComplete(final Throwable failure, final Object success) {
89                 assertTrue(failure instanceof NullPointerException);
90                 assertEquals(4, limiter.availablePermits());
91
92                 // The transaction has failed, no throttling should occur
93                 txContext.executeModification(DELETE, null);
94                 assertEquals(4, limiter.availablePermits());
95
96                 // Executing a read should result in immediate failure
97                 final SettableFuture<Boolean> readFuture = SettableFuture.create();
98                 txContext.executeRead(new DataExists(), readFuture, null);
99                 assertTrue(readFuture.isDone());
100                 try {
101                     readFuture.get();
102                     fail("Read future did not fail");
103                 } catch (ExecutionException | InterruptedException e) {
104                     assertTrue(e.getCause() instanceof NullPointerException);
105                 }
106             }
107         });
108
109         future = txContext.directCommit(null);
110
111         msg = kit.expectMsgClass(BatchedModifications.class);
112         // Modification should have been thrown away by the dropped transmit induced by executeRead()
113         assertEquals(0, msg.getModifications().size());
114         assertTrue(msg.isDoCommitOnReady());
115         assertTrue(msg.isReady());
116         assertEquals(2, msg.getTotalMessagesSent());
117         sendReply(new Failure(new IllegalStateException()));
118         assertFuture(future, new OnComplete<Object>() {
119             @Override
120             public void onComplete(final Throwable failure, final Object success) {
121                 assertTrue(failure instanceof IllegalStateException);
122             }
123         });
124
125         kit.expectNoMessage();
126     }
127
128     /**
129      * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
130      * case, too.
131      */
132     @Test
133     public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
134         txContext.executeModification(DELETE, null);
135         txContext.executeModification(DELETE, null);
136         txContext.executeModification(DELETE, null);
137         txContext.executeModification(DELETE, null);
138         assertEquals(0, limiter.availablePermits());
139         txContext.executeModification(DELETE, null);
140         // Last acquire should have failed ...
141         assertEquals(0, limiter.availablePermits());
142
143         Future<Object> future = txContext.sendBatchedModifications();
144         assertEquals(0, limiter.availablePermits());
145
146         BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
147         // ... so we are sending 5 modifications ...
148         assertEquals(5, msg.getModifications().size());
149         assertEquals(1, msg.getTotalMessagesSent());
150         sendReply(new Failure(new NullPointerException()));
151
152         assertFuture(future, new OnComplete<Object>() {
153             @Override
154             public void onComplete(final Throwable failure, final Object success) {
155                 assertTrue(failure instanceof NullPointerException);
156                 // ... but they account for only 4 permits.
157                 assertEquals(4, limiter.availablePermits());
158             }
159         });
160
161         kit.expectNoMessage();
162     }
163
164     private void sendReply(final Object message) {
165         final ActorRef askActor = kit.getLastSender();
166         kit.watch(askActor);
167         kit.reply(new Failure(new IllegalStateException()));
168         kit.expectTerminated(askActor);
169     }
170
171     private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
172             throws TimeoutException, InterruptedException {
173         Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
174         future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
175     }
176 }