* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.utils;
import akka.dispatch.Futures;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.snapshot.japi.SnapshotStore;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collections;
private static final Map<String, CountDownLatch> SNAPSHOT_DELETED_LATCHES = new ConcurrentHashMap<>();
private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
- public static void addSnapshot(String persistentId, Object snapshot) {
- List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
-
- if (snapshotList == null) {
- snapshotList = new ArrayList<>();
- snapshots.put(persistentId, snapshotList);
- }
+ public static void addSnapshot(final String persistentId, final Object snapshot) {
+ List<StoredSnapshot> snapshotList = snapshots.computeIfAbsent(persistentId, k -> new ArrayList<>());
synchronized (snapshotList) {
snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
}
@SuppressWarnings("unchecked")
- public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
+ public static <T> List<T> getSnapshots(final String persistentId, final Class<T> type) {
List<StoredSnapshot> stored = snapshots.get(persistentId);
if (stored == null) {
return Collections.emptyList();
List<T> retList;
synchronized (stored) {
- retList = Lists.newArrayListWithCapacity(stored.size());
+ retList = new ArrayList<>(stored.size());
for (StoredSnapshot s: stored) {
if (type.isInstance(s.data)) {
retList.add((T) s.data);
return retList;
}
+ public static void clearSnapshotsFor(final String persistenceId) {
+ snapshots.remove(persistenceId);
+ }
+
public static void clear() {
snapshots.clear();
}
- public static void addSnapshotSavedLatch(String persistenceId) {
+ public static void addSnapshotSavedLatch(final String persistenceId) {
SNAPSHOT_SAVED_LATCHES.put(persistenceId, new CountDownLatch(1));
}
- public static void addSnapshotDeletedLatch(String persistenceId) {
+ public static void addSnapshotDeletedLatch(final String persistenceId) {
SNAPSHOT_DELETED_LATCHES.put(persistenceId, new CountDownLatch(1));
}
- public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
+ public static <T> T waitForSavedSnapshot(final String persistenceId, final Class<T> type) {
if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_SAVED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not saved");
}
return getSnapshots(persistenceId, type).get(0);
}
- public static void waitForDeletedSnapshot(String persistenceId) {
+ public static void waitForDeletedSnapshot(final String persistenceId) {
if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_DELETED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not deleted");
}
}
@Override
- public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId,
- SnapshotSelectionCriteria snapshotSelectionCriteria) {
+ public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
+ final SnapshotSelectionCriteria snapshotSelectionCriteria) {
List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
if (snapshotList == null) {
return Futures.successful(Optional.<SelectedSnapshot>empty());
return Futures.successful(Optional.<SelectedSnapshot>empty());
}
- private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
+ private static boolean matches(final StoredSnapshot snapshot, final SnapshotSelectionCriteria criteria) {
return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr()
&& snapshot.metadata.timestamp() <= criteria.maxTimestamp();
}
@Override
- public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object obj) {
+ public Future<Void> doSaveAsync(final SnapshotMetadata snapshotMetadata, final Object obj) {
List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(),
}
@Override
- public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
+ public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
List<StoredSnapshot> snapshotList = snapshots.get(metadata.persistenceId());
if (snapshotList != null) {
}
@Override
- public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+ public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId,
criteria.maxSequenceNr(), criteria.maxTimestamp());
return Futures.successful(null);
}
- private static class StoredSnapshot {
+ private static final class StoredSnapshot {
private final SnapshotMetadata metadata;
private final Object data;
- private StoredSnapshot(SnapshotMetadata metadata, Object data) {
+ StoredSnapshot(final SnapshotMetadata metadata, final Object data) {
this.metadata = metadata;
this.data = data;
}