/* * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.raft.utils; import akka.dispatch.Futures; import akka.persistence.SelectedSnapshot; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; /** * An akka SnapshotStore implementation that stores data in memory. This is intended for testing. * * @author Thomas Pantelis */ public class InMemorySnapshotStore extends SnapshotStore { static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class); private static final Map SNAPSHOT_SAVED_LATCHES = new ConcurrentHashMap<>(); private static final Map SNAPSHOT_DELETED_LATCHES = new ConcurrentHashMap<>(); private static Map> snapshots = new ConcurrentHashMap<>(); public static void addSnapshot(String persistentId, Object snapshot) { List snapshotList = snapshots.get(persistentId); if (snapshotList == null) { snapshotList = new ArrayList<>(); snapshots.put(persistentId, snapshotList); } synchronized (snapshotList) { snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(), System.currentTimeMillis()), snapshot)); } } @SuppressWarnings("unchecked") public static List getSnapshots(String persistentId, Class type) { List stored = snapshots.get(persistentId); if (stored == null) { return Collections.emptyList(); } List retList; synchronized (stored) { retList = Lists.newArrayListWithCapacity(stored.size()); for (StoredSnapshot s: stored) { if (type.isInstance(s.data)) { retList.add((T) s.data); } } } return retList; } public static void clear() { snapshots.clear(); } public static void addSnapshotSavedLatch(String persistenceId) { SNAPSHOT_SAVED_LATCHES.put(persistenceId, new CountDownLatch(1)); } public static void addSnapshotDeletedLatch(String persistenceId) { SNAPSHOT_DELETED_LATCHES.put(persistenceId, new CountDownLatch(1)); } public static T waitForSavedSnapshot(String persistenceId, Class type) { if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_SAVED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) { throw new AssertionError("Snapshot was not saved"); } return getSnapshots(persistenceId, type).get(0); } public static void waitForDeletedSnapshot(String persistenceId) { if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_DELETED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) { throw new AssertionError("Snapshot was not deleted"); } } @Override public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria snapshotSelectionCriteria) { List snapshotList = snapshots.get(persistenceId); if (snapshotList == null) { return Futures.successful(Optional.empty()); } synchronized (snapshotList) { for (int i = snapshotList.size() - 1; i >= 0; i--) { StoredSnapshot snapshot = snapshotList.get(i); if (matches(snapshot, snapshotSelectionCriteria)) { return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata, snapshot.data))); } } } return Futures.successful(Optional.empty()); } private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) { return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() && snapshot.metadata.timestamp() <= criteria.maxTimestamp(); } @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object obj) { List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(), snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), obj); if (snapshotList == null) { snapshotList = new ArrayList<>(); snapshots.put(snapshotMetadata.persistenceId(), snapshotList); } synchronized (snapshotList) { snapshotList.add(new StoredSnapshot(snapshotMetadata, obj)); } CountDownLatch latch = SNAPSHOT_SAVED_LATCHES.get(snapshotMetadata.persistenceId()); if (latch != null) { latch.countDown(); } return Futures.successful(null); } @Override public Future doDeleteAsync(SnapshotMetadata metadata) { List snapshotList = snapshots.get(metadata.persistenceId()); if (snapshotList != null) { synchronized (snapshotList) { for (int i = 0; i < snapshotList.size(); i++) { StoredSnapshot snapshot = snapshotList.get(i); if (metadata.equals(snapshot.metadata)) { snapshotList.remove(i); break; } } } } return Futures.successful(null); } @Override public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId, criteria.maxSequenceNr(), criteria.maxTimestamp()); List snapshotList = snapshots.get(persistenceId); if (snapshotList != null) { synchronized (snapshotList) { Iterator iter = snapshotList.iterator(); while (iter.hasNext()) { StoredSnapshot stored = iter.next(); if (matches(stored, criteria)) { LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}", stored.metadata.sequenceNr(), stored.metadata.timestamp(), stored.data); iter.remove(); } } } } CountDownLatch latch = SNAPSHOT_DELETED_LATCHES.get(persistenceId); if (latch != null) { latch.countDown(); } return Futures.successful(null); } private static class StoredSnapshot { private final SnapshotMetadata metadata; private final Object data; private StoredSnapshot(SnapshotMetadata metadata, Object data) { this.metadata = metadata; this.data = data; } } }