b83c810aadadb2745ca05f4e24d0c69c717cd197
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / ConnectingClientConnectionTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.testkit.TestProbe;
25 import com.google.common.testing.FakeTicker;
26 import java.util.Optional;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.BeforeClass;
34 import org.junit.Test;
35 import org.mockito.ArgumentCaptor;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.ABIVersion;
39 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
40 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
41 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RequestException;
45 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.yangtools.concepts.WritableIdentifier;
48 import scala.concurrent.duration.FiniteDuration;
49
50 /**
51  * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
52  * passes.
53  *
54  * @author Robert Varga
55  */
56 public class ConnectingClientConnectionTest {
57     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
58         private static final long serialVersionUID = 1L;
59
60         MockFailure(final WritableIdentifier target, final RequestException cause) {
61             super(target, 0, cause);
62         }
63
64         @Override
65         protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
66                 final ABIVersion version) {
67             return null;
68         }
69
70         @Override
71         protected MockFailure cloneAsVersion(final ABIVersion version) {
72             return this;
73         }
74     }
75
76     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
77         private static final long serialVersionUID = 1L;
78
79         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
80             super(target, 0, replyTo);
81         }
82
83         @Override
84         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
85             return new MockFailure(getTarget(), cause);
86         }
87
88         @Override
89         protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
90             return null;
91         }
92
93         @Override
94         protected MockRequest cloneAsVersion(final ABIVersion version) {
95             return this;
96         }
97     }
98
99     @Mock
100     private ActorRef mockReplyTo;
101     @Mock
102     private WritableIdentifier mockIdentifier;
103     @Mock
104     private RequestException mockCause;
105     @Mock
106     private Consumer<Response<?, ?>> mockCallback;
107     @Mock
108     private ClientActorBehavior<?> mockBehavior;
109     @Mock
110     private ClientActorContext mockContext;
111
112     private FakeTicker ticker;
113     private BackendInfo mockBackendInfo;
114     private MockRequest mockRequest;
115     private MockRequest mockRequest2;
116     private RequestFailure<WritableIdentifier, ?> mockResponse;
117     private FailureEnvelope mockResponseEnvelope;
118     private Long mockCookie;
119
120     private static ActorSystem actorSystem;
121     private TestProbe mockActor;
122
123     private AbstractClientConnection<?> queue;
124
125     @BeforeClass
126     public static void setupClass() {
127         actorSystem = ActorSystem.apply();
128     }
129
130     @AfterClass
131     public static void teardownClass() {
132         actorSystem.terminate();
133     }
134
135     @Before
136     public void setup() {
137         MockitoAnnotations.initMocks(this);
138
139         doNothing().when(mockCallback).accept(any(MockFailure.class));
140
141         ticker = new FakeTicker();
142         ticker.advance(ThreadLocalRandom.current().nextLong());
143         doReturn(ticker).when(mockContext).ticker();
144
145         final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
146         doReturn(mockConfig).when(mockContext).config();
147
148         mockActor = TestProbe.apply(actorSystem);
149         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
150         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
151         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
152         mockResponse = mockRequest.toRequestFailure(mockCause);
153         mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
154         mockCookie = ThreadLocalRandom.current().nextLong();
155
156         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
157     }
158
159     @After
160     public void teardown() {
161         actorSystem.stop(mockActor.ref());
162     }
163
164     @Test
165     public void testCookie() {
166         assertEquals(mockCookie, queue.cookie());
167     }
168
169     @Test
170     public void testPoison() {
171         queue.sendRequest(mockRequest, mockCallback);
172         queue.poison(mockCause);
173
174         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
175         verify(mockCallback).accept(captor.capture());
176         assertSame(mockCause, captor.getValue().getCause());
177     }
178
179     @Test(expected = IllegalStateException.class)
180     public void testPoisonPerformsClose() {
181         // Implies close()
182         queue.poison(mockCause);
183
184         // Kaboom
185         queue.sendRequest(mockRequest, mockCallback);
186     }
187
188     @Test
189     public void testPoisonIdempotent() {
190         queue.poison(mockCause);
191         queue.poison(mockCause);
192     }
193
194     @Test
195     public void testSendRequestNeedsBackend() {
196         queue.sendRequest(mockRequest, mockCallback);
197         final Optional<Long> ret = queue.checkTimeout(ticker.read());
198         assertNotNull(ret);
199         assertTrue(ret.isPresent());
200     }
201
202     @Test
203     public void testSetBackendWithNoRequests() {
204         // this utility method covers the entire test
205         setupBackend();
206     }
207
208     @Test
209     public void testSendRequestNeedsTimer() {
210         setupBackend();
211
212         queue.sendRequest(mockRequest, mockCallback);
213         final Optional<Long> ret = queue.checkTimeout(ticker.read());
214         assertNotNull(ret);
215         assertTrue(ret.isPresent());
216         assertTransmit(mockRequest, 0);
217     }
218
219     @Test
220     public void testRunTimeoutEmpty() throws NoProgressException {
221         Optional<Long> ret = queue.checkTimeout(ticker.read());
222         assertNotNull(ret);
223         assertFalse(ret.isPresent());
224     }
225
226     @Test
227     public void testRunTimeoutWithoutShift() throws NoProgressException {
228         queue.sendRequest(mockRequest, mockCallback);
229         Optional<Long> ret = queue.checkTimeout(ticker.read());
230         assertNotNull(ret);
231         assertTrue(ret.isPresent());
232     }
233
234     @Test
235     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
236         queue.sendRequest(mockRequest, mockCallback);
237
238         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
239
240         Optional<Long> ret = queue.checkTimeout(ticker.read());
241         assertNotNull(ret);
242         assertTrue(ret.isPresent());
243     }
244
245     @Test
246     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
247         setupBackend();
248
249         queue.sendRequest(mockRequest, mockCallback);
250
251         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
252
253         Optional<Long> ret = queue.checkTimeout(ticker.read());
254         assertNull(ret);
255     }
256
257     @Test
258     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
259         setupBackend();
260
261         queue.sendRequest(mockRequest, mockCallback);
262
263         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
264
265         Optional<Long> ret = queue.checkTimeout(ticker.read());
266         assertNull(ret);
267     }
268
269     @SuppressWarnings({ "rawtypes", "unchecked" })
270     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
271         queue.sendRequest(mockRequest, mockCallback);
272
273         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
274
275         // Kaboom
276         queue.runTimer((ClientActorBehavior) mockBehavior);
277         assertNotNull(queue.poisoned());
278     }
279
280     @SuppressWarnings({ "rawtypes", "unchecked" })
281     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
282         queue.sendRequest(mockRequest, mockCallback);
283
284         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
285
286         // Kaboom
287         queue.runTimer((ClientActorBehavior) mockBehavior);
288         assertNotNull(queue.poisoned());
289     }
290
291     @Test
292     public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
293         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
294
295         // No problem
296         Optional<Long> ret = queue.checkTimeout(ticker.read());
297         assertNotNull(ret);
298         assertFalse(ret.isPresent());
299     }
300
301     @Test
302     public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
303         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
304
305         // No problem
306         Optional<Long> ret = queue.checkTimeout(ticker.read());
307         assertNotNull(ret);
308         assertFalse(ret.isPresent());
309     }
310
311     @Test
312     public void testCompleteEmpty() {
313         queue.receiveResponse(mockResponseEnvelope);
314         verifyNoMoreInteractions(mockCallback);
315     }
316
317     @Test
318     public void testCompleteSingle() {
319         setupBackend();
320
321         queue.sendRequest(mockRequest, mockCallback);
322
323         queue.receiveResponse(mockResponseEnvelope);
324         verify(mockCallback).accept(mockResponse);
325
326         queue.receiveResponse(mockResponseEnvelope);
327         verifyNoMoreInteractions(mockCallback);
328     }
329
330     @Test
331     public void testCompleteNull() {
332         setupBackend();
333
334         queue.sendRequest(mockRequest, mockCallback);
335
336         doNothing().when(mockCallback).accept(mockResponse);
337
338         queue.receiveResponse(mockResponseEnvelope);
339         verify(mockCallback).accept(mockResponse);
340     }
341
342     @Test
343     public void testProgressRecord() throws NoProgressException {
344         setupBackend();
345
346         queue.sendRequest(mockRequest, mockCallback);
347
348         ticker.advance(10);
349         queue.sendRequest(mockRequest2, mockCallback);
350         queue.receiveResponse(mockResponseEnvelope);
351
352         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
353
354         Optional<Long> ret = queue.checkTimeout(ticker.read());
355         assertNull(ret);
356     }
357
358     private void setupBackend() {
359         final ConnectingClientConnection<BackendInfo> connectingConn =
360                 new ConnectingClientConnection<>(mockContext, mockCookie);
361         final ConnectedClientConnection<BackendInfo> connectedConn =
362                 new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
363         queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
364         queue = connectedConn;
365     }
366
367     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
368         assertTrue(mockActor.msgAvailable());
369         assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
370     }
371
372     private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
373         assertTrue(obj instanceof RequestEnvelope);
374
375         final RequestEnvelope actual = (RequestEnvelope) obj;
376         assertEquals(0, actual.getSessionId());
377         assertEquals(sequence, actual.getTxSequence());
378         assertSame(expected, actual.getMessage());
379     }
380 }