2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.testkit.TestProbe;
25 import com.google.common.testing.FakeTicker;
26 import java.util.Optional;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.BeforeClass;
34 import org.junit.Test;
35 import org.mockito.ArgumentCaptor;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.ABIVersion;
39 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
40 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
41 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RequestException;
45 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.yangtools.concepts.WritableIdentifier;
48 import scala.concurrent.duration.FiniteDuration;
51 * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
54 * @author Robert Varga
56 public class ConnectingClientConnectionTest {
57 private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
58 private static final long serialVersionUID = 1L;
60 MockFailure(final WritableIdentifier target, final RequestException cause) {
61 super(target, 0, cause);
65 protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
66 final ABIVersion version) {
71 protected MockFailure cloneAsVersion(final ABIVersion version) {
76 private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
77 private static final long serialVersionUID = 1L;
79 MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
80 super(target, 0, replyTo);
84 public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
85 return new MockFailure(getTarget(), cause);
89 protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
94 protected MockRequest cloneAsVersion(final ABIVersion version) {
100 private ActorRef mockReplyTo;
102 private WritableIdentifier mockIdentifier;
104 private RequestException mockCause;
106 private Consumer<Response<?, ?>> mockCallback;
108 private ClientActorBehavior<?> mockBehavior;
110 private ClientActorContext mockContext;
112 private FakeTicker ticker;
113 private BackendInfo mockBackendInfo;
114 private MockRequest mockRequest;
115 private MockRequest mockRequest2;
116 private RequestFailure<WritableIdentifier, ?> mockResponse;
117 private FailureEnvelope mockResponseEnvelope;
118 private Long mockCookie;
120 private static ActorSystem actorSystem;
121 private TestProbe mockActor;
123 private AbstractClientConnection<?> queue;
126 public static void setupClass() {
127 actorSystem = ActorSystem.apply();
131 public static void teardownClass() {
132 actorSystem.terminate();
136 public void setup() {
137 MockitoAnnotations.initMocks(this);
139 doNothing().when(mockCallback).accept(any(MockFailure.class));
141 ticker = new FakeTicker();
142 ticker.advance(ThreadLocalRandom.current().nextLong());
143 doReturn(ticker).when(mockContext).ticker();
145 final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
146 doReturn(mockConfig).when(mockContext).config();
148 mockActor = TestProbe.apply(actorSystem);
149 mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
150 mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
151 mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
152 mockResponse = mockRequest.toRequestFailure(mockCause);
153 mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
154 mockCookie = ThreadLocalRandom.current().nextLong();
156 queue = new ConnectingClientConnection<>(mockContext, mockCookie);
160 public void teardown() {
161 actorSystem.stop(mockActor.ref());
165 public void testCookie() {
166 assertEquals(mockCookie, queue.cookie());
170 public void testPoison() {
171 queue.sendRequest(mockRequest, mockCallback);
172 queue.poison(mockCause);
174 final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
175 verify(mockCallback).accept(captor.capture());
176 assertSame(mockCause, captor.getValue().getCause());
179 @Test(expected = IllegalStateException.class)
180 public void testPoisonPerformsClose() {
182 queue.poison(mockCause);
185 queue.sendRequest(mockRequest, mockCallback);
189 public void testPoisonIdempotent() {
190 queue.poison(mockCause);
191 queue.poison(mockCause);
195 public void testSendRequestNeedsBackend() {
196 queue.sendRequest(mockRequest, mockCallback);
197 final Optional<Long> ret = queue.checkTimeout(ticker.read());
199 assertTrue(ret.isPresent());
203 public void testSetBackendWithNoRequests() {
204 // this utility method covers the entire test
209 public void testSendRequestNeedsTimer() {
212 queue.sendRequest(mockRequest, mockCallback);
213 final Optional<Long> ret = queue.checkTimeout(ticker.read());
215 assertTrue(ret.isPresent());
216 assertTransmit(mockRequest, 0);
220 public void testRunTimeoutEmpty() throws NoProgressException {
221 Optional<Long> ret = queue.checkTimeout(ticker.read());
223 assertFalse(ret.isPresent());
227 public void testRunTimeoutWithoutShift() throws NoProgressException {
228 queue.sendRequest(mockRequest, mockCallback);
229 Optional<Long> ret = queue.checkTimeout(ticker.read());
231 assertTrue(ret.isPresent());
235 public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
236 queue.sendRequest(mockRequest, mockCallback);
238 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
240 Optional<Long> ret = queue.checkTimeout(ticker.read());
242 assertTrue(ret.isPresent());
246 public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
249 queue.sendRequest(mockRequest, mockCallback);
251 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
253 Optional<Long> ret = queue.checkTimeout(ticker.read());
258 public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
261 queue.sendRequest(mockRequest, mockCallback);
263 ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
265 Optional<Long> ret = queue.checkTimeout(ticker.read());
269 @SuppressWarnings({ "rawtypes", "unchecked" })
270 public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
271 queue.sendRequest(mockRequest, mockCallback);
273 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
276 queue.runTimer((ClientActorBehavior) mockBehavior);
277 assertNotNull(queue.poisoned());
280 @SuppressWarnings({ "rawtypes", "unchecked" })
281 public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
282 queue.sendRequest(mockRequest, mockCallback);
284 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
287 queue.runTimer((ClientActorBehavior) mockBehavior);
288 assertNotNull(queue.poisoned());
292 public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
293 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
296 Optional<Long> ret = queue.checkTimeout(ticker.read());
298 assertFalse(ret.isPresent());
302 public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
303 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
306 Optional<Long> ret = queue.checkTimeout(ticker.read());
308 assertFalse(ret.isPresent());
312 public void testCompleteEmpty() {
313 queue.receiveResponse(mockResponseEnvelope);
314 verifyNoMoreInteractions(mockCallback);
318 public void testCompleteSingle() {
321 queue.sendRequest(mockRequest, mockCallback);
323 queue.receiveResponse(mockResponseEnvelope);
324 verify(mockCallback).accept(mockResponse);
326 queue.receiveResponse(mockResponseEnvelope);
327 verifyNoMoreInteractions(mockCallback);
331 public void testCompleteNull() {
334 queue.sendRequest(mockRequest, mockCallback);
336 doNothing().when(mockCallback).accept(mockResponse);
338 queue.receiveResponse(mockResponseEnvelope);
339 verify(mockCallback).accept(mockResponse);
343 public void testProgressRecord() throws NoProgressException {
346 queue.sendRequest(mockRequest, mockCallback);
349 queue.sendRequest(mockRequest2, mockCallback);
350 queue.receiveResponse(mockResponseEnvelope);
352 ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
354 Optional<Long> ret = queue.checkTimeout(ticker.read());
358 private void setupBackend() {
359 final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
361 queue.setForwarder(new SimpleReconnectForwarder(newConn));
365 private void assertTransmit(final Request<?, ?> expected, final long sequence) {
366 assertTrue(mockActor.msgAvailable());
367 assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
370 private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
371 assertTrue(obj instanceof RequestEnvelope);
373 final RequestEnvelope actual = (RequestEnvelope) obj;
374 assertEquals(0, actual.getSessionId());
375 assertEquals(sequence, actual.getTxSequence());
376 assertSame(expected, actual.getMessage());