--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.persistence.Persistence;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import java.lang.reflect.Field;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+public class ActorBehaviorTest {
+
+ private static final String MEMBER_1_FRONTEND_TYPE_1 = "member-1-frontend-type-1";
+ private static final FiniteDuration TIMEOUT = Duration.apply(5, TimeUnit.SECONDS);
+
+ private ActorSystem system;
+ private TestProbe probe;
+ private ClientActorBehavior<BackendInfo> initialBehavior;
+ private MockedSnapshotStore.SaveRequest saveRequest;
+ private FrontendIdentifier id;
+ private ActorRef mockedActor;
+
+ @Before
+ public void setUp() throws Exception {
+ initialBehavior = createInitialBehaviorMock();
+ system = ActorSystem.apply("system1");
+ final ActorRef storeRef = system.registerExtension(Persistence.lookup()).snapshotStoreFor(null);
+ probe = new TestProbe(system);
+ storeRef.tell(probe.ref(), ActorRef.noSender());
+ final MemberName name = MemberName.forName("member-1");
+ id = FrontendIdentifier.create(name, FrontendType.forName("type-1"));
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ //handle initial actor recovery
+ saveRequest = handleRecovery(null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public void testInitialBehavior() throws Exception {
+ final InternalCommand<BackendInfo> cmd = mock(InternalCommand.class);
+ when(cmd.execute(any())).thenReturn(initialBehavior);
+ mockedActor.tell(cmd, ActorRef.noSender());
+ verify(cmd, timeout(1000)).execute(initialBehavior);
+ }
+
+ @Test
+ public void testCommandStashing() throws Exception {
+ system.stop(mockedActor);
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ final InternalCommand<BackendInfo> cmd = mock(InternalCommand.class);
+ when(cmd.execute(any())).thenReturn(initialBehavior);
+ //send messages before recovery is completed
+ mockedActor.tell(cmd, ActorRef.noSender());
+ mockedActor.tell(cmd, ActorRef.noSender());
+ mockedActor.tell(cmd, ActorRef.noSender());
+ //complete recovery
+ handleRecovery(null);
+ verify(cmd, timeout(1000).times(3)).execute(initialBehavior);
+ }
+
+ @Test
+ public void testRecoveryAfterRestart() throws Exception {
+ system.stop(mockedActor);
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ final MockedSnapshotStore.SaveRequest newSaveRequest =
+ handleRecovery(new SelectedSnapshot(saveRequest.getMetadata(), saveRequest.getSnapshot()));
+ Assert.assertEquals(MEMBER_1_FRONTEND_TYPE_1, newSaveRequest.getMetadata().persistenceId());
+ }
+
+ @Test
+ public void testRecoveryAfterRestartFrontendIdMismatch() throws Exception {
+ system.stop(mockedActor);
+ //start actor again
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ probe.expectMsgClass(MockedSnapshotStore.LoadRequest.class);
+ //offer snapshot with incorrect client id
+ final SnapshotMetadata metadata = saveRequest.getMetadata();
+ final FrontendIdentifier anotherFrontend = FrontendIdentifier.create(MemberName.forName("another"),
+ FrontendType.forName("type-2"));
+ final ClientIdentifier incorrectClientId = ClientIdentifier.create(anotherFrontend, 0);
+ probe.watch(mockedActor);
+ probe.reply(Optional.of(new SelectedSnapshot(metadata, incorrectClientId)));
+ //actor should be stopped
+ probe.expectTerminated(mockedActor, TIMEOUT);
+ }
+
+ @Test
+ public void testRecoveryAfterRestartSaveSnapshotFail() throws Exception {
+ system.stop(mockedActor);
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ probe.watch(mockedActor);
+ probe.expectMsgClass(MockedSnapshotStore.LoadRequest.class);
+ probe.reply(Optional.empty());
+ probe.expectMsgClass(MockedSnapshotStore.SaveRequest.class);
+ probe.reply(new RuntimeException("save failed"));
+ probe.expectMsgClass(MockedSnapshotStore.DeleteByMetadataRequest.class);
+ probe.expectTerminated(mockedActor, TIMEOUT);
+ }
+
+ @Test
+ public void testRecoveryAfterRestartDeleteSnapshotsFail() throws Exception {
+ system.stop(mockedActor);
+ mockedActor = system.actorOf(MockedActor.props(id, initialBehavior));
+ probe.watch(mockedActor);
+ probe.expectMsgClass(MockedSnapshotStore.LoadRequest.class);
+ probe.reply(Optional.empty());
+ probe.expectMsgClass(MockedSnapshotStore.SaveRequest.class);
+ probe.reply(Void.TYPE);
+ probe.expectMsgClass(MockedSnapshotStore.DeleteByCriteriaRequest.class);
+ probe.reply(new RuntimeException("delete failed"));
+ //actor shouldn't terminate
+ probe.expectNoMsg();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ClientActorBehavior<BackendInfo> createInitialBehaviorMock() throws Exception {
+ final ClientActorBehavior<BackendInfo> initialBehavior = mock(ClientActorBehavior.class);
+ //persistenceId() in AbstractClientActorBehavior is final and can't be mocked
+ //use reflection to work around this
+ final Field context = AbstractClientActorBehavior.class.getDeclaredField("context");
+ context.setAccessible(true);
+ final AbstractClientActorContext ctx = mock(AbstractClientActorContext.class);
+ context.set(initialBehavior, ctx);
+ final Field persistenceId = AbstractClientActorContext.class.getDeclaredField("persistenceId");
+ persistenceId.setAccessible(true);
+ persistenceId.set(ctx, MEMBER_1_FRONTEND_TYPE_1);
+ return initialBehavior;
+ }
+
+ private MockedSnapshotStore.SaveRequest handleRecovery(final SelectedSnapshot savedState) {
+ probe.expectMsgClass(MockedSnapshotStore.LoadRequest.class);
+ //offer snapshot
+ probe.reply(Optional.ofNullable(savedState));
+ final MockedSnapshotStore.SaveRequest nextSaveRequest =
+ probe.expectMsgClass(MockedSnapshotStore.SaveRequest.class);
+ probe.reply(Void.TYPE);
+ //check old snapshots deleted
+ probe.expectMsgClass(MockedSnapshotStore.DeleteByCriteriaRequest.class);
+ probe.reply(Void.TYPE);
+ return nextSaveRequest;
+ }
+
+ private static class MockedActor extends AbstractClientActor {
+
+ private final ClientActorBehavior initialBehavior;
+
+ private static Props props(final FrontendIdentifier frontendId, final ClientActorBehavior initialBehavior) {
+ return Props.create(MockedActor.class, () -> new MockedActor(frontendId, initialBehavior));
+ }
+
+ private MockedActor(final FrontendIdentifier frontendId, final ClientActorBehavior initialBehavior) {
+ super(frontendId);
+ this.initialBehavior = initialBehavior;
+ }
+
+ @Override
+ protected ClientActorBehavior<?> initialBehavior(final ClientActorContext context) {
+ return initialBehavior;
+ }
+
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+/**
+ * Instantiated by akka. MockedSnapshotStore forwards method calls as
+ * {@link MockedSnapshotStoreMessage} messages to delegate actor. Delegate reference
+ * must be sent as a message to this snapshot store.
+ */
+class MockedSnapshotStore extends SnapshotStore {
+
+ private static final long TIMEOUT = 1000;
+
+ private ActorRef delegate;
+
+ /**
+ * Marker interface for messages produced by MockedSnapshotStore.
+ */
+ interface MockedSnapshotStoreMessage {
+ }
+
+ @Override
+ public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
+ final SnapshotSelectionCriteria criteria) {
+ return askDelegate(new LoadRequest(persistenceId, criteria));
+ }
+
+ @Override
+ public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
+ return askDelegate(new SaveRequest(metadata, snapshot));
+ }
+
+ @Override
+ public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
+ return askDelegate(new DeleteByMetadataRequest(metadata));
+ }
+
+ @Override
+ public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
+ return askDelegate(new DeleteByCriteriaRequest(persistenceId, criteria));
+ }
+
+ @Override
+ public void unhandled(final Object message) {
+ if (message instanceof ActorRef) {
+ delegate = (ActorRef) message;
+ return;
+ }
+ super.unhandled(message);
+ }
+
+ private <T> Future<T> askDelegate(final MockedSnapshotStoreMessage message) {
+ Preconditions.checkNotNull(delegate, "Delegate ref wasn't sent");
+ final Future<Object> ask = Patterns.ask(delegate, message, TIMEOUT);
+ return transform(ask);
+ }
+
+ private <T> Future<T> transform(final Future<Object> future) {
+ final Promise<T> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (success instanceof Throwable) {
+ promise.failure((Throwable) success);
+ return;
+ }
+ if (success == Void.TYPE) {
+ promise.success(null);
+ return;
+ }
+ promise.success((T) success);
+ }
+ }, context().dispatcher());
+ return promise.future();
+ }
+
+ class LoadRequest implements MockedSnapshotStoreMessage {
+ private final String persistenceId;
+ private final SnapshotSelectionCriteria criteria;
+
+ LoadRequest(final String persistenceId, final SnapshotSelectionCriteria criteria) {
+ this.persistenceId = persistenceId;
+ this.criteria = criteria;
+ }
+
+ public String getPersistenceId() {
+ return persistenceId;
+ }
+
+ public SnapshotSelectionCriteria getCriteria() {
+ return criteria;
+ }
+ }
+
+ class DeleteByCriteriaRequest implements MockedSnapshotStoreMessage {
+ private final String persistenceId;
+ private final SnapshotSelectionCriteria criteria;
+
+ DeleteByCriteriaRequest(final String persistenceId, final SnapshotSelectionCriteria criteria) {
+ this.persistenceId = persistenceId;
+ this.criteria = criteria;
+ }
+
+ public String getPersistenceId() {
+ return persistenceId;
+ }
+
+ public SnapshotSelectionCriteria getCriteria() {
+ return criteria;
+ }
+ }
+
+ class DeleteByMetadataRequest implements MockedSnapshotStoreMessage {
+ private final SnapshotMetadata metadata;
+
+ DeleteByMetadataRequest(final SnapshotMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+ public SnapshotMetadata getMetadata() {
+ return metadata;
+ }
+ }
+
+ class SaveRequest implements MockedSnapshotStoreMessage {
+ private final SnapshotMetadata metadata;
+ private final Object snapshot;
+
+ SaveRequest(final SnapshotMetadata metadata, final Object snapshot) {
+ this.metadata = metadata;
+ this.snapshot = snapshot;
+ }
+
+ public SnapshotMetadata getMetadata() {
+ return metadata;
+ }
+
+ public Object getSnapshot() {
+ return snapshot;
+ }
+ }
+}