2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.testkit.TestProbe;
25 import com.google.common.testing.FakeTicker;
26 import java.util.Optional;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.BeforeClass;
34 import org.junit.Test;
35 import org.mockito.ArgumentCaptor;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.ABIVersion;
39 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
40 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
41 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RequestException;
45 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.yangtools.concepts.WritableIdentifier;
48 import scala.concurrent.duration.FiniteDuration;
51 * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
54 * @author Robert Varga
56 public class ConnectingClientConnectionTest {
57 private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
58 private static final long serialVersionUID = 1L;
60 MockFailure(final WritableIdentifier target, final RequestException cause) {
61 super(target, 0, cause);
65 protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(
66 final ABIVersion version) {
71 protected MockFailure cloneAsVersion(final ABIVersion version) {
76 private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
77 private static final long serialVersionUID = 1L;
79 MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
80 super(target, 0, replyTo);
84 public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
85 return new MockFailure(getTarget(), cause);
89 protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
94 protected MockRequest cloneAsVersion(final ABIVersion version) {
100 private ActorRef mockReplyTo;
102 private WritableIdentifier mockIdentifier;
104 private RequestException mockCause;
106 private Consumer<Response<?, ?>> mockCallback;
108 private ClientActorBehavior<?> mockBehavior;
110 private ClientActorContext mockContext;
112 private FakeTicker ticker;
113 private BackendInfo mockBackendInfo;
114 private MockRequest mockRequest;
115 private MockRequest mockRequest2;
116 private RequestFailure<WritableIdentifier, ?> mockResponse;
117 private FailureEnvelope mockResponseEnvelope;
118 private Long mockCookie;
120 private static ActorSystem actorSystem;
121 private TestProbe mockActor;
123 private AbstractClientConnection<?> queue;
126 public static void setupClass() {
127 actorSystem = ActorSystem.apply();
131 public static void teardownClass() {
132 actorSystem.terminate();
136 public void setup() {
137 MockitoAnnotations.initMocks(this);
139 doNothing().when(mockCallback).accept(any(MockFailure.class));
141 ticker = new FakeTicker();
142 ticker.advance(ThreadLocalRandom.current().nextLong());
143 doReturn(ticker).when(mockContext).ticker();
145 mockActor = TestProbe.apply(actorSystem);
146 mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
147 mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
148 mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
149 mockResponse = mockRequest.toRequestFailure(mockCause);
150 mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
151 mockCookie = ThreadLocalRandom.current().nextLong();
153 queue = new ConnectingClientConnection<>(mockContext, mockCookie);
157 public void teardown() {
158 actorSystem.stop(mockActor.ref());
162 public void testCookie() {
163 assertEquals(mockCookie, queue.cookie());
167 public void testPoison() {
168 queue.sendRequest(mockRequest, mockCallback);
169 queue.poison(mockCause);
171 final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
172 verify(mockCallback).accept(captor.capture());
173 assertSame(mockCause, captor.getValue().getCause());
176 @Test(expected = IllegalStateException.class)
177 public void testPoisonPerformsClose() {
179 queue.poison(mockCause);
182 queue.sendRequest(mockRequest, mockCallback);
186 public void testPoisonIdempotent() {
187 queue.poison(mockCause);
188 queue.poison(mockCause);
192 public void testSendRequestNeedsBackend() {
193 queue.sendRequest(mockRequest, mockCallback);
194 final Optional<Long> ret = queue.checkTimeout(ticker.read());
196 assertTrue(ret.isPresent());
200 public void testSetBackendWithNoRequests() {
201 // this utility method covers the entire test
206 public void testSendRequestNeedsTimer() {
209 queue.sendRequest(mockRequest, mockCallback);
210 final Optional<Long> ret = queue.checkTimeout(ticker.read());
212 assertTrue(ret.isPresent());
213 assertTransmit(mockRequest, 0);
217 public void testRunTimeoutEmpty() throws NoProgressException {
218 Optional<Long> ret = queue.checkTimeout(ticker.read());
220 assertFalse(ret.isPresent());
224 public void testRunTimeoutWithoutShift() throws NoProgressException {
225 queue.sendRequest(mockRequest, mockCallback);
226 Optional<Long> ret = queue.checkTimeout(ticker.read());
228 assertTrue(ret.isPresent());
232 public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
233 queue.sendRequest(mockRequest, mockCallback);
235 ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS - 1);
237 Optional<Long> ret = queue.checkTimeout(ticker.read());
239 assertTrue(ret.isPresent());
243 public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
246 queue.sendRequest(mockRequest, mockCallback);
248 ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS);
250 Optional<Long> ret = queue.checkTimeout(ticker.read());
255 public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
258 queue.sendRequest(mockRequest, mockCallback);
260 ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS + 1);
262 Optional<Long> ret = queue.checkTimeout(ticker.read());
266 @SuppressWarnings({ "rawtypes", "unchecked" })
267 public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
268 queue.sendRequest(mockRequest, mockCallback);
270 ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
273 queue.runTimer((ClientActorBehavior) mockBehavior);
274 assertNotNull(queue.poisoned());
277 @SuppressWarnings({ "rawtypes", "unchecked" })
278 public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
279 queue.sendRequest(mockRequest, mockCallback);
281 ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
284 queue.runTimer((ClientActorBehavior) mockBehavior);
285 assertNotNull(queue.poisoned());
289 public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
290 ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
293 Optional<Long> ret = queue.checkTimeout(ticker.read());
295 assertFalse(ret.isPresent());
299 public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
300 ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
303 Optional<Long> ret = queue.checkTimeout(ticker.read());
305 assertFalse(ret.isPresent());
309 public void testCompleteEmpty() {
310 queue.receiveResponse(mockResponseEnvelope);
311 verifyNoMoreInteractions(mockCallback);
315 public void testCompleteSingle() {
318 queue.sendRequest(mockRequest, mockCallback);
320 queue.receiveResponse(mockResponseEnvelope);
321 verify(mockCallback).accept(mockResponse);
323 queue.receiveResponse(mockResponseEnvelope);
324 verifyNoMoreInteractions(mockCallback);
328 public void testCompleteNull() {
331 queue.sendRequest(mockRequest, mockCallback);
333 doNothing().when(mockCallback).accept(mockResponse);
335 queue.receiveResponse(mockResponseEnvelope);
336 verify(mockCallback).accept(mockResponse);
340 public void testProgressRecord() throws NoProgressException {
343 queue.sendRequest(mockRequest, mockCallback);
346 queue.sendRequest(mockRequest2, mockCallback);
347 queue.receiveResponse(mockResponseEnvelope);
349 ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
351 Optional<Long> ret = queue.checkTimeout(ticker.read());
355 private void setupBackend() {
356 final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
358 queue.setForwarder(new SimpleReconnectForwarder(newConn));
362 private void assertTransmit(final Request<?, ?> expected, final long sequence) {
363 assertTrue(mockActor.msgAvailable());
364 assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
367 private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object obj) {
368 assertTrue(obj instanceof RequestEnvelope);
370 final RequestEnvelope actual = (RequestEnvelope) obj;
371 assertEquals(0, actual.getSessionId());
372 assertEquals(sequence, actual.getTxSequence());
373 assertSame(expected, actual.getMessage());