--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import scala.concurrent.duration.FiniteDuration;
+
+public abstract class AbstractClientConnectionTest<T extends AbstractClientConnection<U>, U extends BackendInfo> {
+
+ protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
+ protected static final FrontendType FRONTEND_TYPE =
+ FrontendType.forName(ClientActorContextTest.class.getSimpleName());
+ protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
+ protected static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
+ protected static final String PERSISTENCE_ID = "per-1";
+
+ protected T connection;
+ protected ClientActorContext context;
+ protected ActorSystem system;
+ protected TestProbe backendProbe;
+ protected TestProbe contextProbe;
+ protected TestProbe replyToProbe;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ backendProbe = new TestProbe(system);
+ contextProbe = new TestProbe(system);
+ context = new ClientActorContext(contextProbe.ref(), system.scheduler(), system.dispatcher(),
+ PERSISTENCE_ID, CLIENT_ID);
+ replyToProbe = new TestProbe(system);
+ connection = createConnection();
+ }
+
+ protected abstract T createConnection();
+
+ @Test
+ public void testLocalActor() throws Exception {
+ Assert.assertEquals(contextProbe.ref(), connection.localActor());
+ }
+
+ @Test
+ public abstract void testReconnectConnection() throws Exception;
+
+ @Test
+ public void testPoison() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ final Request<?, ?> request = createRequest(replyToProbe.ref());
+ final ConnectionEntry entry = new ConnectionEntry(request, callback, 0L);
+ connection.enqueueEntry(entry, 0L);
+ connection.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
+ verify(callback, timeout(1000)).accept(isA(TransactionFailure.class));
+ }
+
+ @Test
+ public void testSendRequestReceiveResponse() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ final Request<?, ?> request = createRequest(replyToProbe.ref());
+ connection.sendRequest(request, callback);
+ final RequestEnvelope requestEnvelope = backendProbe.expectMsgClass(RequestEnvelope.class);
+ Assert.assertEquals(request, requestEnvelope.getMessage());
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(CLIENT_ID, 0L);
+ final RequestSuccess<?, ?> message = new TransactionAbortSuccess(new TransactionIdentifier(historyId, 0L), 0L);
+ final ResponseEnvelope<?> envelope = new SuccessEnvelope(message, 0L, 0L, 0L);
+ connection.receiveResponse(envelope);
+ verify(callback, timeout(1000)).accept(isA(TransactionAbortSuccess.class));
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ final ClientActorBehavior<U> behavior = mock(ClientActorBehavior.class);
+ Assert.assertSame(behavior, connection.runTimer(behavior));
+ }
+
+ @Test
+ public void testCheckTimeoutEmptyQueue() throws Exception {
+ final Optional<FiniteDuration> timeout = connection.checkTimeout(context.ticker().read());
+ Assert.assertFalse(timeout.isPresent());
+ }
+
+ @Test
+ public void testCheckTimeoutConnectionTimeouted() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ connection.sendRequest(createRequest(replyToProbe.ref()), callback);
+ final long now = context.ticker().read() + ConnectedClientConnection.REQUEST_TIMEOUT_NANOS;
+ final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
+ Assert.assertNull(timeout);
+ }
+
+ @Test
+ public void testCheckTimeout() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ connection.sendRequest(createRequest(replyToProbe.ref()), callback);
+ final long now = context.ticker().read();
+ final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
+ Assert.assertTrue(timeout.isPresent());
+ }
+
+ @Test
+ public void testReplay() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ final Request<?, ?> request1 = createRequest(replyToProbe.ref());
+ final Request<?, ?> request2 = createRequest(replyToProbe.ref());
+ connection.sendRequest(request1, callback);
+ connection.sendRequest(request2, callback);
+ final Iterable<ConnectionEntry> entries = connection.startReplay();
+ Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
+ final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
+ connection.finishReplay(forwarder);
+ verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
+ verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ protected Request<?, ?> createRequest(final ActorRef replyTo) {
+ final TransactionIdentifier identifier =
+ new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
+ return new AbortLocalTransactionRequest(identifier, replyTo);
+ }
+
+ private static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
+ return new ConnectionEntryMatcher(request);
+ }
+
+ private static class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
+
+ private final Request request;
+
+ private ConnectionEntryMatcher(final Request request) {
+ this.request = request;
+ }
+
+ @Override
+ public boolean matches(final Object item) {
+ if (!(item instanceof ConnectionEntry)) {
+ return false;
+ }
+ final ConnectionEntry entry = (ConnectionEntry) item;
+ return this.request.equals(entry.getRequest());
+ }
+
+ @Override
+ public void describeMismatch(final Object item, final Description description) {
+ final ConnectionEntry entry = (ConnectionEntry) item;
+ super.describeMismatch(entry.getRequest(), description);
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendValue(request);
+ }
+ }
+
+}
import static org.junit.Assert.assertSame;
-import akka.actor.ActorRef;
-import akka.actor.Scheduler;
-import akka.dispatch.Dispatcher;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
public class ClientActorContextTest {
private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
private static final String PERSISTENCE_ID = ClientActorContextTest.class.getSimpleName();
@Mock
- private ActorRef mockSelf;
-
- @Mock
- private Scheduler mockScheduler;
-
- @Mock
- private Dispatcher mockDispatcher;
+ private InternalCommand<? extends BackendInfo> command;
+ private ActorSystem system;
+ private TestProbe probe;
+ private ClientActorContext ctx;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ probe = new TestProbe(system);
+ ctx = new ClientActorContext(probe.ref(), system.scheduler(), system.dispatcher(),
+ PERSISTENCE_ID, CLIENT_ID);
}
@Test
public void testMockingControl() {
- ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher,
- PERSISTENCE_ID, CLIENT_ID);
assertSame(CLIENT_ID, ctx.getIdentifier());
assertSame(PERSISTENCE_ID, ctx.persistenceId());
- assertSame(mockSelf, ctx.self());
+ assertSame(probe.ref(), ctx.self());
}
@Test
public void testTicker() {
- ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher,
- PERSISTENCE_ID, CLIENT_ID);
assertSame(Ticker.systemTicker(), ctx.ticker());
}
+
+ @Test
+ public void testExecuteInActor() throws Exception {
+ ctx.executeInActor(command);
+ probe.expectMsg(command);
+ }
+
+ @Test
+ public void testExecuteInActorScheduled() throws Exception {
+ final FiniteDuration delay = Duration.apply(1, TimeUnit.SECONDS);
+ ctx.executeInActor(command, delay);
+ probe.expectMsg(command);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public class ReconnectingClientConnectionTest
+ extends AbstractClientConnectionTest<ReconnectingClientConnection<BackendInfo>, BackendInfo> {
+
+ @Override
+ protected ReconnectingClientConnection<BackendInfo> createConnection() {
+ final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
+
+ final ConnectedClientConnection<BackendInfo> oldConnection =
+ new ConnectedClientConnection<>(context, 0L, backend);
+ return new ReconnectingClientConnection<>(oldConnection);
+ }
+
+ @Override
+ @Test
+ public void testReconnectConnection() throws Exception {
+ final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
+ Assert.assertSame(behavior, connection.reconnectConnection(behavior));
+ }
+
+ @Override
+ @Test
+ public void testSendRequestReceiveResponse() throws Exception {
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+ final Request<?, ?> request = createRequest(replyToProbe.ref());
+ connection.sendRequest(request, callback);
+ backendProbe.expectNoMsg();
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(CLIENT_ID, 0L);
+ final RequestSuccess<?, ?> message = new TransactionAbortSuccess(new TransactionIdentifier(historyId, 0L), 0L);
+ final ResponseEnvelope<?> envelope = new SuccessEnvelope(message, 0L, 0L, 0L);
+ connection.receiveResponse(envelope);
+ verify(callback, after(1000).never()).accept(any());
+ }
+}
\ No newline at end of file