import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorSystem;
import akka.testkit.TestProbe;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.common.actor.TestTicker;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import scala.concurrent.duration.FiniteDuration;
/**
- * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes.
+ * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
+ * passes.
*
* @author Robert Varga
*/
-public class SequencedQueueTest {
+public class ConnectingClientConnectionTest {
private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
private static final long serialVersionUID = 1L;
@Mock
private RequestException mockCause;
@Mock
- private RequestCallback mockCallback;
+ private Consumer<Response<?, ?>> mockCallback;
@Mock
- private ClientActorBehavior mockBehavior;
+ private ClientActorBehavior<?> mockBehavior;
+ @Mock
+ private ClientActorContext mockContext;
private TestTicker ticker;
private BackendInfo mockBackendInfo;
private static ActorSystem actorSystem;
private TestProbe mockActor;
- private SequencedQueue queue;
+ private AbstractClientConnection<?> queue;
@BeforeClass
public static void setupClass() {
public void setup() {
MockitoAnnotations.initMocks(this);
- doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+ doNothing().when(mockCallback).accept(any(MockFailure.class));
ticker = new TestTicker();
ticker.increment(ThreadLocalRandom.current().nextLong());
+ doReturn(ticker).when(mockContext).ticker();
mockActor = TestProbe.apply(actorSystem);
mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
mockCookie = ThreadLocalRandom.current().nextLong();
- queue = new SequencedQueue(mockCookie, ticker);
+ queue = new ConnectingClientConnection<>(mockContext, mockCookie);
}
@After
}
@Test
- public void testGetCookie() {
- assertSame(mockCookie, queue.getCookie());
- }
-
- @Test
- public void testEmptyClose() {
- assertFalse(queue.hasCompleted());
- queue.close();
- assertTrue(queue.hasCompleted());
- }
-
- @Test(expected = IllegalStateException.class)
- public void testClosedEnqueueRequest() {
- queue.close();
-
- // Kaboom
- queue.enqueueRequest(mockRequest, mockCallback);
- }
-
- @Test
- public void testCloseIdempotent() {
- queue.close();
- queue.close();
+ public void testCookie() {
+ assertSame(mockCookie, queue.cookie());
}
@Test
public void testPoison() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
queue.poison(mockCause);
final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
- verify(mockCallback).complete(captor.capture());
+ verify(mockCallback).accept(captor.capture());
assertSame(mockCause, captor.getValue().getCause());
}
queue.poison(mockCause);
// Kaboom
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
}
@Test
}
@Test
- public void testEnqueueRequestNeedsBackend() {
- final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
-
+ public void testSendRequestNeedsBackend() {
+ queue.sendRequest(mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
assertNotNull(ret);
- assertFalse(ret.isPresent());
- }
-
- @Test
- public void testExpectProof() {
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- assertTrue(queue.expectProof(proof));
- assertFalse(queue.expectProof(proof));
- }
-
- @Test(expected = NullPointerException.class)
- public void testSetBackendNull() {
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- assertTrue(queue.expectProof(proof));
- queue.setBackendInfo(proof, null);
- }
-
- @Test
- public void testSetBackendWithNoResolution() {
- queue.enqueueRequest(mockRequest, mockCallback);
-
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
- assertNotNull(ret);
- assertFalse(ret.isPresent());
- }
-
- @Test
- public void testSetBackendWithWrongProof() {
- queue.enqueueRequest(mockRequest, mockCallback);
-
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- assertTrue(queue.expectProof(proof));
-
- final Optional<FiniteDuration> ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo);
- assertNotNull(ret);
- assertFalse(ret.isPresent());
+ assertTrue(ret.isPresent());
}
@Test
}
@Test
- public void testSetBackendWithRequestsNoTimer() {
- queue.enqueueRequest(mockRequest, mockCallback);
-
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- assertTrue(queue.expectProof(proof));
- assertFalse(mockActor.msgAvailable());
-
- final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
- assertNotNull(ret);
- assertTrue(ret.isPresent());
-
- assertTransmit(mockRequest, 0);
- }
-
- @Test
- public void testEnqueueRequestNeedsTimer() {
+ public void testSendRequestNeedsTimer() {
setupBackend();
- final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
assertNotNull(ret);
assertTrue(ret.isPresent());
assertTransmit(mockRequest, 0);
}
- @Test
- public void testEnqueueRequestWithoutTimer() {
- setupBackend();
-
- // First request
- Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
- assertNotNull(ret);
- assertTrue(ret.isPresent());
- assertTransmit(mockRequest, 0);
-
- // Second request, no timer fired
- ret = queue.enqueueRequest(mockRequest2, mockCallback);
- assertNull(ret);
- assertTransmit(mockRequest2, 1);
- }
-
@Test
public void testRunTimeoutEmpty() throws NoProgressException {
- final boolean ret = queue.runTimeout();
- assertFalse(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
}
@Test
public void testRunTimeoutWithoutShift() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
- final boolean ret = queue.runTimeout();
- assertFalse(ret);
+ queue.sendRequest(mockRequest, mockCallback);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNotNull(ret);
+ assertTrue(ret.isPresent());
}
@Test
public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
+ ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1);
- final boolean ret = queue.runTimeout();
- assertFalse(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNotNull(ret);
+ assertTrue(ret.isPresent());
}
@Test
public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
+ ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS);
- final boolean ret = queue.runTimeout();
- assertTrue(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNull(ret);
}
@Test
public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
+ ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1);
- final boolean ret = queue.runTimeout();
- assertTrue(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNull(ret);
}
- @Test(expected = NoProgressException.class)
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+ ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
// Kaboom
- queue.runTimeout();
+ queue.runTimer((ClientActorBehavior) mockBehavior);
+ assertNotNull(queue.poisoned());
}
- @Test(expected = NoProgressException.class)
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+ ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
// Kaboom
- queue.runTimeout();
+ queue.runTimer((ClientActorBehavior) mockBehavior);
+ assertNotNull(queue.poisoned());
}
@Test
public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
- ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+ ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
// No problem
- final boolean ret = queue.runTimeout();
- assertFalse(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
}
@Test
public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
- ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+ ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
// No problem
- final boolean ret = queue.runTimeout();
- assertFalse(ret);
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
}
@Test
public void testCompleteEmpty() {
- final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
- assertSame(mockBehavior, ret);
+ queue.receiveResponse(mockResponseEnvelope);
verifyNoMoreInteractions(mockCallback);
}
public void testCompleteSingle() {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
- verify(mockCallback).complete(mockResponse);
- assertSame(mockBehavior, ret);
+ queue.receiveResponse(mockResponseEnvelope);
+ verify(mockCallback).accept(mockResponse);
- ret = queue.complete(mockBehavior, mockResponseEnvelope);
- assertSame(mockBehavior, ret);
+ queue.receiveResponse(mockResponseEnvelope);
verifyNoMoreInteractions(mockCallback);
}
public void testCompleteNull() {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
- doReturn(null).when(mockCallback).complete(mockResponse);
+ doNothing().when(mockCallback).accept(mockResponse);
- ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
- verify(mockCallback).complete(mockResponse);
- assertNull(ret);
+ queue.receiveResponse(mockResponseEnvelope);
+ verify(mockCallback).accept(mockResponse);
}
@Test
public void testProgressRecord() throws NoProgressException {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.sendRequest(mockRequest, mockCallback);
ticker.increment(10);
- queue.enqueueRequest(mockRequest2, mockCallback);
- queue.complete(mockBehavior, mockResponseEnvelope);
+ queue.sendRequest(mockRequest2, mockCallback);
+ queue.receiveResponse(mockResponseEnvelope);
- ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
- assertTrue(queue.runTimeout());
+ ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
+
+ Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+ assertNull(ret);
}
private void setupBackend() {
- final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
- assertTrue(queue.expectProof(proof));
- final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
- assertNotNull(ret);
- assertFalse(ret.isPresent());
- assertFalse(mockActor.msgAvailable());
+ final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
+ mockBackendInfo);
+ queue.setForwarder(new SimpleReconnectForwarder(newConn));
+ queue = newConn;
}
private void assertTransmit(final Request<?, ?> expected, final long sequence) {