2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.actors.client;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import java.util.Optional;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ThreadLocalRandom;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.AfterClass;
29 import org.junit.Before;
30 import org.junit.BeforeClass;
31 import org.junit.Test;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.access.ABIVersion;
36 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
37 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
38 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
39 import org.opendaylight.controller.cluster.access.concepts.Request;
40 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
41 import org.opendaylight.controller.cluster.access.concepts.RequestException;
42 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
43 import org.opendaylight.controller.cluster.common.actor.TestTicker;
44 import org.opendaylight.yangtools.concepts.WritableIdentifier;
45 import scala.concurrent.duration.FiniteDuration;
48 * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes.
50 * @author Robert Varga
52 public class SequencedQueueTest {
53 private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
54 private static final long serialVersionUID = 1L;
56 MockFailure(final WritableIdentifier target, final RequestException cause) {
61 protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
66 protected MockFailure cloneAsVersion(final ABIVersion version) {
71 private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
72 private static final long serialVersionUID = 1L;
74 MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
75 super(target, replyTo);
79 public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
80 return new MockFailure(getTarget(), cause);
84 protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
89 protected MockRequest cloneAsVersion(final ABIVersion version) {
95 private ActorRef mockReplyTo;
97 private WritableIdentifier mockIdentifier;
99 private RequestException mockCause;
101 private RequestCallback mockCallback;
103 private ClientActorBehavior mockBehavior;
105 private TestTicker ticker;
106 private BackendInfo mockBackendInfo;
107 private MockRequest mockRequest;
108 private MockRequest mockRequest2;
109 private RequestFailure<WritableIdentifier, ?> mockResponse;
110 private FailureEnvelope mockResponseEnvelope;
111 private Long mockCookie;
113 private static ActorSystem actorSystem;
114 private TestProbe mockActor;
116 private SequencedQueue queue;
119 public static void setupClass() {
120 actorSystem = ActorSystem.apply();
124 public static void teardownClass() {
125 actorSystem.terminate();
129 public void setup() {
130 MockitoAnnotations.initMocks(this);
132 doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
134 ticker = new TestTicker();
135 ticker.increment(ThreadLocalRandom.current().nextLong());
137 mockActor = TestProbe.apply(actorSystem);
138 mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
139 mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
140 mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
141 mockResponse = mockRequest.toRequestFailure(mockCause);
142 mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
143 mockCookie = ThreadLocalRandom.current().nextLong();
145 queue = new SequencedQueue(mockCookie, ticker);
149 public void teardown() {
150 actorSystem.stop(mockActor.ref());
154 public void testGetCookie() {
155 assertSame(mockCookie, queue.getCookie());
159 public void testEmptyClose() {
160 assertFalse(queue.hasCompleted());
162 assertTrue(queue.hasCompleted());
165 @Test(expected=IllegalStateException.class)
166 public void testClosedEnqueueRequest() {
170 queue.enqueueRequest(0, mockRequest, mockCallback);
174 public void testCloseIdempotent() {
180 public void testPoison() {
181 queue.enqueueRequest(0, mockRequest, mockCallback);
182 queue.poison(mockCause);
184 final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
185 verify(mockCallback).complete(captor.capture());
186 assertSame(mockCause, captor.getValue().getCause());
189 @Test(expected=IllegalStateException.class)
190 public void testPoisonPerformsClose() {
192 queue.poison(mockCause);
195 queue.enqueueRequest(0, mockRequest, mockCallback);
199 public void testPoisonIdempotent() {
200 queue.poison(mockCause);
201 queue.poison(mockCause);
205 public void testEnqueueRequestNeedsBackend() {
206 final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
209 assertFalse(ret.isPresent());
213 public void testExpectProof() {
214 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
215 assertTrue(queue.expectProof(proof));
216 assertFalse(queue.expectProof(proof));
219 @Test(expected=NullPointerException.class)
220 public void testSetBackendNull() {
221 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
222 assertTrue(queue.expectProof(proof));
223 queue.setBackendInfo(proof, null);
227 public void testSetBackendWithNoResolution() {
228 queue.enqueueRequest(0, mockRequest, mockCallback);
230 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
231 final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
233 assertFalse(ret.isPresent());
237 public void testSetBackendWithWrongProof() {
238 queue.enqueueRequest(0, mockRequest, mockCallback);
240 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
241 assertTrue(queue.expectProof(proof));
243 final Optional<FiniteDuration> ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo);
245 assertFalse(ret.isPresent());
249 public void testSetBackendWithNoRequests() {
250 // this utility method covers the entire test
255 public void testSetbackedWithRequestsNoTimer() {
256 queue.enqueueRequest(0, mockRequest, mockCallback);
258 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
259 assertTrue(queue.expectProof(proof));
260 assertFalse(mockActor.msgAvailable());
262 final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
264 assertTrue(ret.isPresent());
266 assertTransmit(mockRequest, 0);
270 public void testEnqueueRequestNeedsTimer() {
273 final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
275 assertTrue(ret.isPresent());
276 assertTransmit(mockRequest, 0);
280 public void testEnqueueRequestWithoutTimer() {
284 Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
286 assertTrue(ret.isPresent());
287 assertTransmit(mockRequest, 0);
289 // Second request, no timer fired
290 ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
292 assertTransmit(mockRequest2, 1);
296 public void testRunTimeoutEmpty() throws NoProgressException {
297 final boolean ret = queue.runTimeout();
302 public void testRunTimeoutWithoutShift() throws NoProgressException {
303 queue.enqueueRequest(0, mockRequest, mockCallback);
304 final boolean ret = queue.runTimeout();
309 public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
310 queue.enqueueRequest(0, mockRequest, mockCallback);
312 ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
314 final boolean ret = queue.runTimeout();
319 public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
320 queue.enqueueRequest(0, mockRequest, mockCallback);
322 ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
324 final boolean ret = queue.runTimeout();
329 public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
330 queue.enqueueRequest(0, mockRequest, mockCallback);
332 ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
334 final boolean ret = queue.runTimeout();
338 @Test(expected=NoProgressException.class)
339 public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
340 queue.enqueueRequest(0, mockRequest, mockCallback);
342 ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
348 @Test(expected=NoProgressException.class)
349 public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
350 queue.enqueueRequest(0, mockRequest, mockCallback);
352 ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
359 public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
360 ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
363 final boolean ret = queue.runTimeout();
368 public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
369 ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
372 final boolean ret = queue.runTimeout();
377 public void testCompleteEmpty() {
378 final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
379 assertSame(mockBehavior, ret);
380 verifyNoMoreInteractions(mockCallback);
384 public void testCompleteSingle() {
385 queue.enqueueRequest(0, mockRequest, mockCallback);
387 ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
388 verify(mockCallback).complete(mockResponse);
389 assertSame(mockBehavior, ret);
391 ret = queue.complete(mockBehavior, mockResponseEnvelope);
392 assertSame(mockBehavior, ret);
393 verifyNoMoreInteractions(mockCallback);
397 public void testCompleteNull() {
398 queue.enqueueRequest(0, mockRequest, mockCallback);
400 doReturn(null).when(mockCallback).complete(mockResponse);
402 ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
403 verify(mockCallback).complete(mockResponse);
408 public void testProgressRecord() throws NoProgressException {
411 queue.enqueueRequest(0, mockRequest, mockCallback);
413 ticker.increment(10);
414 queue.enqueueRequest(1, mockRequest2, mockCallback);
415 queue.complete(mockBehavior, mockResponseEnvelope);
417 ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
418 assertTrue(queue.runTimeout());
421 private void setupBackend() {
422 final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
423 assertTrue(queue.expectProof(proof));
424 final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
426 assertFalse(ret.isPresent());
427 assertFalse(mockActor.msgAvailable());
430 private void assertTransmit(final Request<?, ?> expected, final long sequence) {
431 assertTrue(mockActor.msgAvailable());
432 assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
435 private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object o) {
436 assertTrue(o instanceof RequestEnvelope);
438 final RequestEnvelope actual = (RequestEnvelope) o;
439 assertEquals(0, actual.getRetry());
440 assertEquals(sequence, actual.getSequence());
441 assertSame(expected, actual.getMessage());