Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / ConnectingClientConnectionTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.testkit.TestProbe;
26 import com.google.common.testing.FakeTicker;
27 import java.util.Optional;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Consumer;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.BeforeClass;
35 import org.junit.Test;
36 import org.mockito.ArgumentCaptor;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.access.ABIVersion;
40 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
41 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
42 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
43 import org.opendaylight.controller.cluster.access.concepts.Request;
44 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
45 import org.opendaylight.controller.cluster.access.concepts.RequestException;
46 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
47 import org.opendaylight.controller.cluster.access.concepts.Response;
48 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
49 import org.opendaylight.yangtools.concepts.WritableIdentifier;
50 import scala.concurrent.duration.FiniteDuration;
51
52 /**
53  * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
54  * passes.
55  *
56  * @author Robert Varga
57  */
58 public class ConnectingClientConnectionTest {
59     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
60         private static final long serialVersionUID = 1L;
61
62         MockFailure(final WritableIdentifier target, final RequestException cause) {
63             super(target, 0, cause);
64         }
65
66         @Override
67         protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
68                 final ABIVersion version) {
69             return null;
70         }
71
72         @Override
73         protected MockFailure cloneAsVersion(final ABIVersion version) {
74             return this;
75         }
76     }
77
78     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
79         private static final long serialVersionUID = 1L;
80
81         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
82             super(target, 0, replyTo);
83         }
84
85         @Override
86         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
87             return new MockFailure(getTarget(), cause);
88         }
89
90         @Override
91         protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
92             return null;
93         }
94
95         @Override
96         protected MockRequest cloneAsVersion(final ABIVersion version) {
97             return this;
98         }
99     }
100
101     @Mock
102     private ActorRef mockReplyTo;
103     @Mock
104     private WritableIdentifier mockIdentifier;
105     @Mock
106     private RequestException mockCause;
107     @Mock
108     private Consumer<Response<?, ?>> mockCallback;
109     @Mock
110     private ClientActorBehavior<?> mockBehavior;
111     @Mock
112     private ClientActorContext mockContext;
113
114     private FakeTicker ticker;
115     private BackendInfo mockBackendInfo;
116     private MockRequest mockRequest;
117     private MockRequest mockRequest2;
118     private RequestFailure<WritableIdentifier, ?> mockResponse;
119     private FailureEnvelope mockResponseEnvelope;
120     private Long mockCookie;
121
122     private static ActorSystem actorSystem;
123     private TestProbe mockActor;
124
125     private AbstractClientConnection<?> queue;
126
127     @BeforeClass
128     public static void setupClass() {
129         actorSystem = ActorSystem.apply();
130     }
131
132     @AfterClass
133     public static void teardownClass() {
134         actorSystem.terminate();
135     }
136
137     @Before
138     public void setup() {
139         MockitoAnnotations.initMocks(this);
140
141         doNothing().when(mockCallback).accept(any(MockFailure.class));
142
143         ticker = new FakeTicker();
144         ticker.advance(ThreadLocalRandom.current().nextLong());
145         doReturn(ticker).when(mockContext).ticker();
146
147         final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
148         doReturn(mockConfig).when(mockContext).config();
149
150         doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer();
151
152         mockActor = TestProbe.apply(actorSystem);
153         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
154         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
155         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
156         mockResponse = mockRequest.toRequestFailure(mockCause);
157         mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
158         mockCookie = ThreadLocalRandom.current().nextLong();
159
160         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
161     }
162
163     @After
164     public void teardown() {
165         actorSystem.stop(mockActor.ref());
166     }
167
168     @Test
169     public void testCookie() {
170         assertEquals(mockCookie, queue.cookie());
171     }
172
173     @Test
174     public void testPoison() {
175         queue.sendRequest(mockRequest, mockCallback);
176         queue.poison(mockCause);
177
178         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
179         verify(mockCallback).accept(captor.capture());
180         assertSame(mockCause, captor.getValue().getCause());
181     }
182
183     @Test(expected = IllegalStateException.class)
184     public void testPoisonPerformsClose() {
185         // Implies close()
186         queue.poison(mockCause);
187
188         // Kaboom
189         queue.sendRequest(mockRequest, mockCallback);
190     }
191
192     @Test
193     public void testPoisonIdempotent() {
194         queue.poison(mockCause);
195         queue.poison(mockCause);
196     }
197
198     @Test
199     public void testSendRequestNeedsBackend() {
200         queue.sendRequest(mockRequest, mockCallback);
201         final Optional<Long> ret = queue.checkTimeout(ticker.read());
202         assertNotNull(ret);
203         assertTrue(ret.isPresent());
204     }
205
206     @Test
207     public void testSetBackendWithNoRequests() {
208         // this utility method covers the entire test
209         setupBackend();
210     }
211
212     @Test
213     public void testSendRequestNeedsTimer() {
214         setupBackend();
215
216         queue.sendRequest(mockRequest, mockCallback);
217         final Optional<Long> ret = queue.checkTimeout(ticker.read());
218         assertNotNull(ret);
219         assertTrue(ret.isPresent());
220         assertTransmit(mockRequest, 0);
221     }
222
223     @Test
224     public void testRunTimeoutEmpty() throws NoProgressException {
225         Optional<Long> ret = queue.checkTimeout(ticker.read());
226         assertNotNull(ret);
227         assertFalse(ret.isPresent());
228     }
229
230     @Test
231     public void testRunTimeoutWithoutShift() throws NoProgressException {
232         queue.sendRequest(mockRequest, mockCallback);
233         Optional<Long> ret = queue.checkTimeout(ticker.read());
234         assertNotNull(ret);
235         assertTrue(ret.isPresent());
236     }
237
238     @Test
239     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
240         queue.sendRequest(mockRequest, mockCallback);
241
242         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
243
244         Optional<Long> ret = queue.checkTimeout(ticker.read());
245         assertNotNull(ret);
246         assertTrue(ret.isPresent());
247     }
248
249     @Test
250     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
251         setupBackend();
252
253         queue.sendRequest(mockRequest, mockCallback);
254
255         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
256
257         Optional<Long> ret = queue.checkTimeout(ticker.read());
258         assertNull(ret);
259     }
260
261     @Test
262     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
263         setupBackend();
264
265         queue.sendRequest(mockRequest, mockCallback);
266
267         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
268
269         Optional<Long> ret = queue.checkTimeout(ticker.read());
270         assertNull(ret);
271     }
272
273     @SuppressWarnings({ "rawtypes", "unchecked" })
274     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
275         queue.sendRequest(mockRequest, mockCallback);
276
277         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
278
279         // Kaboom
280         queue.runTimer((ClientActorBehavior) mockBehavior);
281         assertNotNull(queue.poisoned());
282     }
283
284     @SuppressWarnings({ "rawtypes", "unchecked" })
285     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
286         queue.sendRequest(mockRequest, mockCallback);
287
288         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
289
290         // Kaboom
291         queue.runTimer((ClientActorBehavior) mockBehavior);
292         assertNotNull(queue.poisoned());
293     }
294
295     @Test
296     public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
297         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
298
299         // No problem
300         Optional<Long> ret = queue.checkTimeout(ticker.read());
301         assertNotNull(ret);
302         assertFalse(ret.isPresent());
303     }
304
305     @Test
306     public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
307         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
308
309         // No problem
310         Optional<Long> ret = queue.checkTimeout(ticker.read());
311         assertNotNull(ret);
312         assertFalse(ret.isPresent());
313     }
314
315     @Test
316     public void testCompleteEmpty() {
317         queue.receiveResponse(mockResponseEnvelope);
318         verifyNoMoreInteractions(mockCallback);
319     }
320
321     @Test
322     public void testCompleteSingle() {
323         setupBackend();
324
325         queue.sendRequest(mockRequest, mockCallback);
326
327         queue.receiveResponse(mockResponseEnvelope);
328         verify(mockCallback).accept(mockResponse);
329
330         queue.receiveResponse(mockResponseEnvelope);
331         verifyNoMoreInteractions(mockCallback);
332     }
333
334     @Test
335     public void testCompleteNull() {
336         setupBackend();
337
338         queue.sendRequest(mockRequest, mockCallback);
339
340         doNothing().when(mockCallback).accept(mockResponse);
341
342         queue.receiveResponse(mockResponseEnvelope);
343         verify(mockCallback).accept(mockResponse);
344     }
345
346     @Test
347     public void testProgressRecord() throws NoProgressException {
348         setupBackend();
349
350         queue.sendRequest(mockRequest, mockCallback);
351
352         ticker.advance(10);
353         queue.sendRequest(mockRequest2, mockCallback);
354         queue.receiveResponse(mockResponseEnvelope);
355
356         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
357
358         Optional<Long> ret = queue.checkTimeout(ticker.read());
359         assertNull(ret);
360     }
361
362     private void setupBackend() {
363         final ConnectingClientConnection<BackendInfo> connectingConn =
364                 new ConnectingClientConnection<>(mockContext, mockCookie);
365         final ConnectedClientConnection<BackendInfo> connectedConn =
366                 new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
367         queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
368         queue = connectedConn;
369     }
370
371     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
372         assertTrue(mockActor.msgAvailable());
373         assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
374     }
375
376     private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
377         assertTrue(obj instanceof RequestEnvelope);
378
379         final RequestEnvelope actual = (RequestEnvelope) obj;
380         assertEquals(0, actual.getSessionId());
381         assertEquals(sequence, actual.getTxSequence());
382         assertSame(expected, actual.getMessage());
383     }
384 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.