2d5b696271e13869262be29c60eb953c0ce55871
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / ConnectingClientConnectionTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.testkit.TestProbe;
25 import java.util.Optional;
26 import java.util.concurrent.ThreadLocalRandom;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.junit.After;
30 import org.junit.AfterClass;
31 import org.junit.Before;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.access.ABIVersion;
38 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
39 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
40 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
41 import org.opendaylight.controller.cluster.access.concepts.Request;
42 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
43 import org.opendaylight.controller.cluster.access.concepts.RequestException;
44 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
45 import org.opendaylight.controller.cluster.access.concepts.Response;
46 import org.opendaylight.controller.cluster.common.actor.TestTicker;
47 import org.opendaylight.yangtools.concepts.WritableIdentifier;
48 import scala.concurrent.duration.FiniteDuration;
49
50 /**
51  * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
52  * passes.
53  *
54  * @author Robert Varga
55  */
56 public class ConnectingClientConnectionTest {
57     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
58         private static final long serialVersionUID = 1L;
59
60         MockFailure(final WritableIdentifier target, final RequestException cause) {
61             super(target, 0, cause);
62         }
63
64         @Override
65         protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
66                 final ABIVersion version) {
67             return null;
68         }
69
70         @Override
71         protected MockFailure cloneAsVersion(final ABIVersion version) {
72             return this;
73         }
74     }
75
76     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
77         private static final long serialVersionUID = 1L;
78
79         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
80             super(target, 0, replyTo);
81         }
82
83         @Override
84         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
85             return new MockFailure(getTarget(), cause);
86         }
87
88         @Override
89         protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
90             return null;
91         }
92
93         @Override
94         protected MockRequest cloneAsVersion(final ABIVersion version) {
95             return this;
96         }
97     }
98
99     @Mock
100     private ActorRef mockReplyTo;
101     @Mock
102     private WritableIdentifier mockIdentifier;
103     @Mock
104     private RequestException mockCause;
105     @Mock
106     private Consumer<Response<?, ?>> mockCallback;
107     @Mock
108     private ClientActorBehavior<?> mockBehavior;
109     @Mock
110     private ClientActorContext mockContext;
111
112     private TestTicker ticker;
113     private BackendInfo mockBackendInfo;
114     private MockRequest mockRequest;
115     private MockRequest mockRequest2;
116     private RequestFailure<WritableIdentifier, ?> mockResponse;
117     private FailureEnvelope mockResponseEnvelope;
118     private Long mockCookie;
119
120     private static ActorSystem actorSystem;
121     private TestProbe mockActor;
122
123     private AbstractClientConnection<?> queue;
124
125     @BeforeClass
126     public static void setupClass() {
127         actorSystem = ActorSystem.apply();
128     }
129
130     @AfterClass
131     public static void teardownClass() {
132         actorSystem.terminate();
133     }
134
135     @Before
136     public void setup() {
137         MockitoAnnotations.initMocks(this);
138
139         doNothing().when(mockCallback).accept(any(MockFailure.class));
140
141         ticker = new TestTicker();
142         ticker.increment(ThreadLocalRandom.current().nextLong());
143         doReturn(ticker).when(mockContext).ticker();
144
145         mockActor = TestProbe.apply(actorSystem);
146         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
147         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
148         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
149         mockResponse = mockRequest.toRequestFailure(mockCause);
150         mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
151         mockCookie = ThreadLocalRandom.current().nextLong();
152
153         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
154     }
155
156     @After
157     public void teardown() {
158         actorSystem.stop(mockActor.ref());
159     }
160
161     @Test
162     public void testCookie() {
163         assertEquals(mockCookie, queue.cookie());
164     }
165
166     @Test
167     public void testPoison() {
168         queue.sendRequest(mockRequest, mockCallback);
169         queue.poison(mockCause);
170
171         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
172         verify(mockCallback).accept(captor.capture());
173         assertSame(mockCause, captor.getValue().getCause());
174     }
175
176     @Test(expected = IllegalStateException.class)
177     public void testPoisonPerformsClose() {
178         // Implies close()
179         queue.poison(mockCause);
180
181         // Kaboom
182         queue.sendRequest(mockRequest, mockCallback);
183     }
184
185     @Test
186     public void testPoisonIdempotent() {
187         queue.poison(mockCause);
188         queue.poison(mockCause);
189     }
190
191     @Test
192     public void testSendRequestNeedsBackend() {
193         queue.sendRequest(mockRequest, mockCallback);
194         final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
195         assertNotNull(ret);
196         assertTrue(ret.isPresent());
197     }
198
199     @Test
200     public void testSetBackendWithNoRequests() {
201         // this utility method covers the entire test
202         setupBackend();
203     }
204
205     @Test
206     public void testSendRequestNeedsTimer() {
207         setupBackend();
208
209         queue.sendRequest(mockRequest, mockCallback);
210         final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
211         assertNotNull(ret);
212         assertTrue(ret.isPresent());
213         assertTransmit(mockRequest, 0);
214     }
215
216     @Test
217     public void testRunTimeoutEmpty() throws NoProgressException {
218         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
219         assertNotNull(ret);
220         assertFalse(ret.isPresent());
221     }
222
223     @Test
224     public void testRunTimeoutWithoutShift() throws NoProgressException {
225         queue.sendRequest(mockRequest, mockCallback);
226         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
227         assertNotNull(ret);
228         assertTrue(ret.isPresent());
229     }
230
231     @Test
232     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
233         queue.sendRequest(mockRequest, mockCallback);
234
235         ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1);
236
237         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
238         assertNotNull(ret);
239         assertTrue(ret.isPresent());
240     }
241
242     @Test
243     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
244         setupBackend();
245
246         queue.sendRequest(mockRequest, mockCallback);
247
248         ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS);
249
250         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
251         assertNull(ret);
252     }
253
254     @Test
255     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
256         setupBackend();
257
258         queue.sendRequest(mockRequest, mockCallback);
259
260         ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1);
261
262         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
263         assertNull(ret);
264     }
265
266     @SuppressWarnings({ "rawtypes", "unchecked" })
267     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
268         queue.sendRequest(mockRequest, mockCallback);
269
270         ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
271
272         // Kaboom
273         queue.runTimer((ClientActorBehavior) mockBehavior);
274         assertNotNull(queue.poisoned());
275     }
276
277     @SuppressWarnings({ "rawtypes", "unchecked" })
278     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
279         queue.sendRequest(mockRequest, mockCallback);
280
281         ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
282
283         // Kaboom
284         queue.runTimer((ClientActorBehavior) mockBehavior);
285         assertNotNull(queue.poisoned());
286     }
287
288     @Test
289     public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
290         ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
291
292         // No problem
293         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
294         assertNotNull(ret);
295         assertFalse(ret.isPresent());
296     }
297
298     @Test
299     public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
300         ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
301
302         // No problem
303         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
304         assertNotNull(ret);
305         assertFalse(ret.isPresent());
306     }
307
308     @Test
309     public void testCompleteEmpty() {
310         queue.receiveResponse(mockResponseEnvelope);
311         verifyNoMoreInteractions(mockCallback);
312     }
313
314     @Test
315     public void testCompleteSingle() {
316         setupBackend();
317
318         queue.sendRequest(mockRequest, mockCallback);
319
320         queue.receiveResponse(mockResponseEnvelope);
321         verify(mockCallback).accept(mockResponse);
322
323         queue.receiveResponse(mockResponseEnvelope);
324         verifyNoMoreInteractions(mockCallback);
325     }
326
327     @Test
328     public void testCompleteNull() {
329         setupBackend();
330
331         queue.sendRequest(mockRequest, mockCallback);
332
333         doNothing().when(mockCallback).accept(mockResponse);
334
335         queue.receiveResponse(mockResponseEnvelope);
336         verify(mockCallback).accept(mockResponse);
337     }
338
339     @Test
340     public void testProgressRecord() throws NoProgressException {
341         setupBackend();
342
343         queue.sendRequest(mockRequest, mockCallback);
344
345         ticker.increment(10);
346         queue.sendRequest(mockRequest2, mockCallback);
347         queue.receiveResponse(mockResponseEnvelope);
348
349         ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
350
351         Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
352         assertNull(ret);
353     }
354
355     private void setupBackend() {
356         final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
357                 mockBackendInfo);
358         queue.setForwarder(new SimpleReconnectForwarder(newConn));
359         queue = newConn;
360     }
361
362     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
363         assertTrue(mockActor.msgAvailable());
364         assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
365     }
366
367     private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
368         assertTrue(obj instanceof RequestEnvelope);
369
370         final RequestEnvelope actual = (RequestEnvelope) obj;
371         assertEquals(0, actual.getSessionId());
372         assertEquals(sequence, actual.getTxSequence());
373         assertSame(expected, actual.getMessage());
374     }
375 }