/* * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.access.client; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.persistence.SelectedSnapshot; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; import java.util.Optional; import scala.concurrent.Future; import scala.concurrent.Promise; /** * Instantiated by akka. MockedSnapshotStore forwards method calls as * {@link MockedSnapshotStoreMessage} messages to delegate actor. Delegate reference * must be sent as a message to this snapshot store. */ class MockedSnapshotStore extends SnapshotStore { private static final long TIMEOUT = 1000; private ActorRef delegate; /** * Marker interface for messages produced by MockedSnapshotStore. */ interface MockedSnapshotStoreMessage { } @Override public Future> doLoadAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) { return askDelegate(new LoadRequest(persistenceId, criteria)); } @Override public Future doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) { return askDelegate(new SaveRequest(metadata, snapshot)); } @Override public Future doDeleteAsync(final SnapshotMetadata metadata) { return askDelegate(new DeleteByMetadataRequest(metadata)); } @Override public Future doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) { return askDelegate(new DeleteByCriteriaRequest(persistenceId, criteria)); } @Override public void unhandled(final Object message) { if (message instanceof ActorRef) { delegate = (ActorRef) message; return; } super.unhandled(message); } private Future askDelegate(final MockedSnapshotStoreMessage message) { return transform(Patterns.ask(requireNonNull(delegate, "Delegate ref was not sent"), message, TIMEOUT)); } private Future transform(final Future future) { final Promise promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); future.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final Object success) { if (success instanceof Throwable) { promise.failure((Throwable) success); return; } if (success == Void.TYPE) { promise.success(null); return; } promise.success((T) success); } }, context().dispatcher()); return promise.future(); } class LoadRequest implements MockedSnapshotStoreMessage { private final String persistenceId; private final SnapshotSelectionCriteria criteria; LoadRequest(final String persistenceId, final SnapshotSelectionCriteria criteria) { this.persistenceId = persistenceId; this.criteria = criteria; } public String getPersistenceId() { return persistenceId; } public SnapshotSelectionCriteria getCriteria() { return criteria; } } class DeleteByCriteriaRequest implements MockedSnapshotStoreMessage { private final String persistenceId; private final SnapshotSelectionCriteria criteria; DeleteByCriteriaRequest(final String persistenceId, final SnapshotSelectionCriteria criteria) { this.persistenceId = persistenceId; this.criteria = criteria; } public String getPersistenceId() { return persistenceId; } public SnapshotSelectionCriteria getCriteria() { return criteria; } } class DeleteByMetadataRequest implements MockedSnapshotStoreMessage { private final SnapshotMetadata metadata; DeleteByMetadataRequest(final SnapshotMetadata metadata) { this.metadata = metadata; } public SnapshotMetadata getMetadata() { return metadata; } } class SaveRequest implements MockedSnapshotStoreMessage { private final SnapshotMetadata metadata; private final Object snapshot; SaveRequest(final SnapshotMetadata metadata, final Object snapshot) { this.metadata = metadata; this.snapshot = snapshot; } public SnapshotMetadata getMetadata() { return metadata; } public Object getSnapshot() { return snapshot; } } }