2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.testkit.TestProbe;
26 import com.google.common.testing.FakeTicker;
27 import java.util.Optional;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Consumer;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.BeforeClass;
35 import org.junit.Test;
36 import org.mockito.ArgumentCaptor;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.access.ABIVersion;
40 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
41 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
42 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
43 import org.opendaylight.controller.cluster.access.concepts.Request;
44 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
45 import org.opendaylight.controller.cluster.access.concepts.RequestException;
46 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
47 import org.opendaylight.controller.cluster.access.concepts.Response;
48 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
49 import org.opendaylight.yangtools.concepts.WritableIdentifier;
50 import scala.concurrent.duration.FiniteDuration;
53 * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
56 * @author Robert Varga
58 public class ConnectingClientConnectionTest {
59 private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
60 private static final long serialVersionUID = 1L;
62 MockFailure(final WritableIdentifier target, final RequestException cause) {
63 super(target, 0, cause);
67 protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
68 final ABIVersion version) {
73 protected MockFailure cloneAsVersion(final ABIVersion version) {
78 private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
79 private static final long serialVersionUID = 1L;
81 MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
82 super(target, 0, replyTo);
86 public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
87 return new MockFailure(getTarget(), cause);
91 protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
96 protected MockRequest cloneAsVersion(final ABIVersion version) {
102 private ActorRef mockReplyTo;
104 private WritableIdentifier mockIdentifier;
106 private RequestException mockCause;
108 private Consumer<Response<?, ?>> mockCallback;
110 private ClientActorBehavior<?> mockBehavior;
112 private ClientActorContext mockContext;
114 private FakeTicker ticker;
115 private BackendInfo mockBackendInfo;
116 private MockRequest mockRequest;
117 private MockRequest mockRequest2;
118 private RequestFailure<WritableIdentifier, ?> mockResponse;
119 private FailureEnvelope mockResponseEnvelope;
120 private Long mockCookie;
122 private static ActorSystem actorSystem;
123 private TestProbe mockActor;
125 private AbstractClientConnection<?> queue;
128 public static void setupClass() {
129 actorSystem = ActorSystem.apply();
133 public static void teardownClass() {
134 actorSystem.terminate();
138 public void setup() {
139 MockitoAnnotations.initMocks(this);
141 doNothing().when(mockCallback).accept(any(MockFailure.class));
143 ticker = new FakeTicker();
144 ticker.advance(ThreadLocalRandom.current().nextLong());
145 doReturn(ticker).when(mockContext).ticker();
147 final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
148 doReturn(mockConfig).when(mockContext).config();
150 doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer();
152 mockActor = TestProbe.apply(actorSystem);
153 mockBackendInfo = new BackendInfo(mockActor.ref(), "test", 0, ABIVersion.current(), 5);
154 mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
155 mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
156 mockResponse = mockRequest.toRequestFailure(mockCause);
157 mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
158 mockCookie = ThreadLocalRandom.current().nextLong();
160 queue = new ConnectingClientConnection<>(mockContext, mockCookie, mockBackendInfo.getName());
164 public void teardown() {
165 actorSystem.stop(mockActor.ref());
169 public void testCookie() {
170 assertEquals(mockCookie, queue.cookie());
174 public void testPoison() {
175 queue.sendRequest(mockRequest, mockCallback);
176 queue.poison(mockCause);
178 final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
179 verify(mockCallback).accept(captor.capture());
180 assertSame(mockCause, captor.getValue().getCause());
183 @Test(expected = IllegalStateException.class)
184 public void testPoisonPerformsClose() {
186 queue.poison(mockCause);
189 queue.sendRequest(mockRequest, mockCallback);
193 public void testPoisonIdempotent() {
194 queue.poison(mockCause);
195 queue.poison(mockCause);
199 public void testSendRequestNeedsBackend() {
200 queue.sendRequest(mockRequest, mockCallback);
201 final Optional<Long> ret = queue.checkTimeout(ticker.read());
203 assertTrue(ret.isPresent());
207 public void testSetBackendWithNoRequests() {
208 // this utility method covers the entire test
213 public void testSendRequestNeedsTimer() {
216 queue.sendRequest(mockRequest, mockCallback);
217 final Optional<Long> ret = queue.checkTimeout(ticker.read());
219 assertTrue(ret.isPresent());
220 assertTransmit(mockRequest, 0);
224 public void testRunTimeoutEmpty() {
225 Optional<Long> ret = queue.checkTimeout(ticker.read());
227 assertFalse(ret.isPresent());
231 public void testRunTimeoutWithoutShift() {
232 queue.sendRequest(mockRequest, mockCallback);
233 Optional<Long> ret = queue.checkTimeout(ticker.read());
235 assertTrue(ret.isPresent());
239 public void testRunTimeoutWithTimeoutLess() {
240 queue.sendRequest(mockRequest, mockCallback);
242 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
244 Optional<Long> ret = queue.checkTimeout(ticker.read());
246 assertTrue(ret.isPresent());
250 public void testRunTimeoutWithTimeoutExact() {
253 queue.sendRequest(mockRequest, mockCallback);
255 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
257 Optional<Long> ret = queue.checkTimeout(ticker.read());
262 public void testRunTimeoutWithTimeoutMore() {
265 queue.sendRequest(mockRequest, mockCallback);
267 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
269 Optional<Long> ret = queue.checkTimeout(ticker.read());
273 @SuppressWarnings({ "rawtypes", "unchecked" })
274 public void testRunTimeoutWithoutProgressExact() {
275 queue.sendRequest(mockRequest, mockCallback);
277 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
280 queue.runTimer((ClientActorBehavior) mockBehavior);
281 assertNotNull(queue.poisoned());
284 @SuppressWarnings({ "rawtypes", "unchecked" })
285 public void testRunTimeoutWithoutProgressMore() {
286 queue.sendRequest(mockRequest, mockCallback);
288 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
291 queue.runTimer((ClientActorBehavior) mockBehavior);
292 assertNotNull(queue.poisoned());
296 public void testRunTimeoutEmptyWithoutProgressExact() {
297 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
300 Optional<Long> ret = queue.checkTimeout(ticker.read());
302 assertFalse(ret.isPresent());
306 public void testRunTimeoutEmptyWithoutProgressMore() {
307 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
310 Optional<Long> ret = queue.checkTimeout(ticker.read());
312 assertFalse(ret.isPresent());
316 public void testCompleteEmpty() {
317 queue.receiveResponse(mockResponseEnvelope);
318 verifyNoMoreInteractions(mockCallback);
322 public void testCompleteSingle() {
325 queue.sendRequest(mockRequest, mockCallback);
327 queue.receiveResponse(mockResponseEnvelope);
328 verify(mockCallback).accept(mockResponse);
330 queue.receiveResponse(mockResponseEnvelope);
331 verifyNoMoreInteractions(mockCallback);
335 public void testCompleteNull() {
338 queue.sendRequest(mockRequest, mockCallback);
340 doNothing().when(mockCallback).accept(mockResponse);
342 queue.receiveResponse(mockResponseEnvelope);
343 verify(mockCallback).accept(mockResponse);
347 public void testProgressRecord() {
350 queue.sendRequest(mockRequest, mockCallback);
353 queue.sendRequest(mockRequest2, mockCallback);
354 queue.receiveResponse(mockResponseEnvelope);
356 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
358 Optional<Long> ret = queue.checkTimeout(ticker.read());
362 private void setupBackend() {
363 final ConnectingClientConnection<BackendInfo> connectingConn =
364 new ConnectingClientConnection<>(mockContext, mockCookie, "test");
365 final ConnectedClientConnection<BackendInfo> connectedConn =
366 new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
367 queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
368 queue = connectedConn;
371 private void assertTransmit(final Request<?, ?> expected, final long sequence) {
372 assertTrue(mockActor.msgAvailable());
373 assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
376 private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
377 assertTrue(obj instanceof RequestEnvelope);
379 final RequestEnvelope actual = (RequestEnvelope) obj;
380 assertEquals(0, actual.getSessionId());
381 assertEquals(sequence, actual.getTxSequence());
382 assertSame(expected, actual.getMessage());