Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / ConnectingClientConnectionTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.testkit.TestProbe;
26 import com.google.common.testing.FakeTicker;
27 import java.util.OptionalLong;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Consumer;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.BeforeClass;
35 import org.junit.Test;
36 import org.junit.runner.RunWith;
37 import org.mockito.ArgumentCaptor;
38 import org.mockito.Mock;
39 import org.mockito.junit.MockitoJUnitRunner;
40 import org.opendaylight.controller.cluster.access.ABIVersion;
41 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RequestException;
45 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
48 import org.opendaylight.yangtools.concepts.WritableIdentifier;
49 import scala.concurrent.duration.FiniteDuration;
50
51 /**
52  * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
53  * passes.
54  */
55 @RunWith(MockitoJUnitRunner.class)
56 public class ConnectingClientConnectionTest {
57     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
58         private static final long serialVersionUID = 1L;
59
60         MockFailure(final WritableIdentifier target, final RequestException cause) {
61             super(target, 0, cause);
62         }
63
64         @Override
65         protected SerialForm<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
66             return null;
67         }
68
69         @Override
70         protected MockFailure cloneAsVersion(final ABIVersion version) {
71             return this;
72         }
73     }
74
75     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
76         private static final long serialVersionUID = 1L;
77
78         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
79             super(target, 0, replyTo);
80         }
81
82         @Override
83         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
84             return new MockFailure(getTarget(), cause);
85         }
86
87         @Override
88         protected Request.SerialForm<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
89             return null;
90         }
91
92         @Override
93         protected MockRequest cloneAsVersion(final ABIVersion version) {
94             return this;
95         }
96     }
97
98     @Mock
99     private ActorRef mockReplyTo;
100     @Mock
101     private WritableIdentifier mockIdentifier;
102     @Mock
103     private RequestException mockCause;
104     @Mock
105     private Consumer<Response<?, ?>> mockCallback;
106     @Mock
107     private ClientActorBehavior<?> mockBehavior;
108     @Mock
109     private ClientActorContext mockContext;
110
111     private FakeTicker ticker;
112     private BackendInfo mockBackendInfo;
113     private MockRequest mockRequest;
114     private MockRequest mockRequest2;
115     private RequestFailure<WritableIdentifier, ?> mockResponse;
116     private FailureEnvelope mockResponseEnvelope;
117     private Long mockCookie;
118
119     private static ActorSystem actorSystem;
120     private TestProbe mockActor;
121
122     private AbstractClientConnection<?> queue;
123
124     @BeforeClass
125     public static void setupClass() {
126         actorSystem = ActorSystem.apply();
127     }
128
129     @AfterClass
130     public static void teardownClass() {
131         actorSystem.terminate();
132     }
133
134     @Before
135     public void setup() {
136         doNothing().when(mockCallback).accept(any(MockFailure.class));
137
138         ticker = new FakeTicker();
139         ticker.advance(ThreadLocalRandom.current().nextLong());
140         doReturn(ticker).when(mockContext).ticker();
141
142         final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
143         doReturn(mockConfig).when(mockContext).config();
144
145         doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer();
146
147         mockActor = TestProbe.apply(actorSystem);
148         mockBackendInfo = new BackendInfo(mockActor.ref(), "test", 0, ABIVersion.current(), 5);
149         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
150         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
151         mockResponse = mockRequest.toRequestFailure(mockCause);
152         mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
153         mockCookie = ThreadLocalRandom.current().nextLong();
154
155         queue = new ConnectingClientConnection<>(mockContext, mockCookie, mockBackendInfo.getName());
156     }
157
158     @After
159     public void teardown() {
160         actorSystem.stop(mockActor.ref());
161     }
162
163     @Test
164     public void testCookie() {
165         assertEquals(mockCookie, queue.cookie());
166     }
167
168     @Test
169     public void testPoison() {
170         queue.sendRequest(mockRequest, mockCallback);
171         queue.poison(mockCause);
172
173         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
174         verify(mockCallback).accept(captor.capture());
175         assertSame(mockCause, captor.getValue().getCause());
176     }
177
178     @Test(expected = IllegalStateException.class)
179     public void testPoisonPerformsClose() {
180         // Implies close()
181         queue.poison(mockCause);
182
183         // Kaboom
184         queue.sendRequest(mockRequest, mockCallback);
185     }
186
187     @Test
188     public void testPoisonIdempotent() {
189         queue.poison(mockCause);
190         queue.poison(mockCause);
191     }
192
193     @Test
194     public void testSendRequestNeedsBackend() {
195         queue.sendRequest(mockRequest, mockCallback);
196         final OptionalLong ret = queue.checkTimeout(ticker.read());
197         assertNotNull(ret);
198         assertTrue(ret.isPresent());
199     }
200
201     @Test
202     public void testSetBackendWithNoRequests() {
203         // this utility method covers the entire test
204         setupBackend();
205     }
206
207     @Test
208     public void testSendRequestNeedsTimer() {
209         setupBackend();
210
211         queue.sendRequest(mockRequest, mockCallback);
212         final OptionalLong ret = queue.checkTimeout(ticker.read());
213         assertNotNull(ret);
214         assertTrue(ret.isPresent());
215         assertTransmit(mockRequest, 0);
216     }
217
218     @Test
219     public void testRunTimeoutEmpty() {
220         OptionalLong ret = queue.checkTimeout(ticker.read());
221         assertNotNull(ret);
222         assertFalse(ret.isPresent());
223     }
224
225     @Test
226     public void testRunTimeoutWithoutShift() {
227         queue.sendRequest(mockRequest, mockCallback);
228         OptionalLong ret = queue.checkTimeout(ticker.read());
229         assertNotNull(ret);
230         assertTrue(ret.isPresent());
231     }
232
233     @Test
234     public void testRunTimeoutWithTimeoutLess() {
235         queue.sendRequest(mockRequest, mockCallback);
236
237         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
238
239         OptionalLong ret = queue.checkTimeout(ticker.read());
240         assertNotNull(ret);
241         assertTrue(ret.isPresent());
242     }
243
244     @Test
245     public void testRunTimeoutWithTimeoutExact() {
246         setupBackend();
247
248         queue.sendRequest(mockRequest, mockCallback);
249
250         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
251
252         OptionalLong ret = queue.checkTimeout(ticker.read());
253         assertNull(ret);
254     }
255
256     @Test
257     public void testRunTimeoutWithTimeoutMore() {
258         setupBackend();
259
260         queue.sendRequest(mockRequest, mockCallback);
261
262         ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
263
264         assertNull(queue.checkTimeout(ticker.read()));
265     }
266
267     @SuppressWarnings({ "rawtypes", "unchecked" })
268     public void testRunTimeoutWithoutProgressExact() {
269         queue.sendRequest(mockRequest, mockCallback);
270
271         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
272
273         // Kaboom
274         queue.runTimer((ClientActorBehavior) mockBehavior);
275         assertNotNull(queue.poisoned());
276     }
277
278     @SuppressWarnings({ "rawtypes", "unchecked" })
279     public void testRunTimeoutWithoutProgressMore() {
280         queue.sendRequest(mockRequest, mockCallback);
281
282         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
283
284         // Kaboom
285         queue.runTimer((ClientActorBehavior) mockBehavior);
286         assertNotNull(queue.poisoned());
287     }
288
289     @Test
290     public void testRunTimeoutEmptyWithoutProgressExact() {
291         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
292
293         // No problem
294         assertEquals(OptionalLong.empty(), queue.checkTimeout(ticker.read()));
295     }
296
297     @Test
298     public void testRunTimeoutEmptyWithoutProgressMore() {
299         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
300
301         // No problem
302         assertEquals(OptionalLong.empty(), queue.checkTimeout(ticker.read()));
303     }
304
305     @Test
306     public void testCompleteEmpty() {
307         queue.receiveResponse(mockResponseEnvelope);
308         verifyNoMoreInteractions(mockCallback);
309     }
310
311     @Test
312     public void testCompleteSingle() {
313         setupBackend();
314
315         queue.sendRequest(mockRequest, mockCallback);
316
317         queue.receiveResponse(mockResponseEnvelope);
318         verify(mockCallback).accept(mockResponse);
319
320         queue.receiveResponse(mockResponseEnvelope);
321         verifyNoMoreInteractions(mockCallback);
322     }
323
324     @Test
325     public void testCompleteNull() {
326         setupBackend();
327
328         queue.sendRequest(mockRequest, mockCallback);
329
330         doNothing().when(mockCallback).accept(mockResponse);
331
332         queue.receiveResponse(mockResponseEnvelope);
333         verify(mockCallback).accept(mockResponse);
334     }
335
336     @Test
337     public void testProgressRecord() {
338         setupBackend();
339
340         queue.sendRequest(mockRequest, mockCallback);
341
342         ticker.advance(10);
343         queue.sendRequest(mockRequest2, mockCallback);
344         queue.receiveResponse(mockResponseEnvelope);
345
346         ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
347
348         assertNull(queue.checkTimeout(ticker.read()));
349     }
350
351     private void setupBackend() {
352         final ConnectingClientConnection<BackendInfo> connectingConn =
353                 new ConnectingClientConnection<>(mockContext, mockCookie, "test");
354         final ConnectedClientConnection<BackendInfo> connectedConn =
355                 new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
356         queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
357         queue = connectedConn;
358     }
359
360     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
361         assertTrue(mockActor.msgAvailable());
362         assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
363     }
364
365     private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
366         assertTrue(obj instanceof RequestEnvelope);
367
368         final RequestEnvelope actual = (RequestEnvelope) obj;
369         assertEquals(0, actual.getSessionId());
370         assertEquals(sequence, actual.getTxSequence());
371         assertSame(expected, actual.getMessage());
372     }
373 }