X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemorySnapshotStore.java;h=662a063788fd522412d778eff71000bfcdaa9507;hb=refs%2Fchanges%2F32%2F83832%2F8;hp=01f337567560ba02f7849c8b920cd5f05b18f949;hpb=d36d8d28eaf7e4cc9ac0bd2972e11346819d4c3c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java index 01f3375675..662a063788 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java @@ -5,23 +5,23 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.utils; import akka.dispatch.Futures; -import akka.japi.Option; import akka.persistence.SelectedSnapshot; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -35,15 +35,12 @@ public class InMemorySnapshotStore extends SnapshotStore { static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class); + private static final Map SNAPSHOT_SAVED_LATCHES = new ConcurrentHashMap<>(); + private static final Map SNAPSHOT_DELETED_LATCHES = new ConcurrentHashMap<>(); private static Map> snapshots = new ConcurrentHashMap<>(); - public static void addSnapshot(String persistentId, Object snapshot) { - List snapshotList = snapshots.get(persistentId); - - if(snapshotList == null) { - snapshotList = new ArrayList<>(); - snapshots.put(persistentId, snapshotList); - } + public static void addSnapshot(final String persistentId, final Object snapshot) { + List snapshotList = snapshots.computeIfAbsent(persistentId, k -> new ArrayList<>()); synchronized (snapshotList) { snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(), @@ -52,18 +49,18 @@ public class InMemorySnapshotStore extends SnapshotStore { } @SuppressWarnings("unchecked") - public static List getSnapshots(String persistentId, Class type) { + public static List getSnapshots(final String persistentId, final Class type) { List stored = snapshots.get(persistentId); - if(stored == null) { + if (stored == null) { return Collections.emptyList(); } List retList; synchronized (stored) { - retList = Lists.newArrayListWithCapacity(stored.size()); - for(StoredSnapshot s: stored) { - if(type.isInstance(s.getData())) { - retList.add((T) s.getData()); + retList = new ArrayList<>(stored.size()); + for (StoredSnapshot s: stored) { + if (type.isInstance(s.data)) { + retList.add((T) s.data); } } } @@ -71,106 +68,140 @@ public class InMemorySnapshotStore extends SnapshotStore { return retList; } + public static void clearSnapshotsFor(final String persistenceId) { + snapshots.remove(persistenceId); + } + public static void clear() { snapshots.clear(); } - @Override - public Future> doLoadAsync(String s, - SnapshotSelectionCriteria snapshotSelectionCriteria) { - List snapshotList = snapshots.get(s); - if(snapshotList == null){ - return Futures.successful(Option.none()); + public static void addSnapshotSavedLatch(final String persistenceId) { + SNAPSHOT_SAVED_LATCHES.put(persistenceId, new CountDownLatch(1)); + } + + public static void addSnapshotDeletedLatch(final String persistenceId) { + SNAPSHOT_DELETED_LATCHES.put(persistenceId, new CountDownLatch(1)); + } + + public static T waitForSavedSnapshot(final String persistenceId, final Class type) { + if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_SAVED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not saved"); } - StoredSnapshot snapshot = Iterables.getLast(snapshotList); - SelectedSnapshot selectedSnapshot = - new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); - return Futures.successful(Option.some(selectedSnapshot)); + return getSnapshots(persistenceId, type).get(0); } - @Override - public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { - List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + public static void waitForDeletedSnapshot(final String persistenceId) { + if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_DELETED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not deleted"); + } + } - if(snapshotList == null){ - snapshotList = new ArrayList<>(); - snapshots.put(snapshotMetadata.persistenceId(), snapshotList); + @Override + public Future> doLoadAsync(final String persistenceId, + final SnapshotSelectionCriteria snapshotSelectionCriteria) { + List snapshotList = snapshots.get(persistenceId); + if (snapshotList == null) { + return Futures.successful(Optional.empty()); } + synchronized (snapshotList) { - snapshotList.add(new StoredSnapshot(snapshotMetadata, o)); + for (int i = snapshotList.size() - 1; i >= 0; i--) { + StoredSnapshot snapshot = snapshotList.get(i); + if (matches(snapshot, snapshotSelectionCriteria)) { + return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata, + snapshot.data))); + } + } } - return Futures.successful(null); + return Futures.successful(Optional.empty()); } - @Override - public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + private static boolean matches(final StoredSnapshot snapshot, final SnapshotSelectionCriteria criteria) { + return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() + && snapshot.metadata.timestamp() <= criteria.maxTimestamp(); } @Override - public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + public Future doSaveAsync(final SnapshotMetadata snapshotMetadata, final Object obj) { List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); - if(snapshotList == null){ - return; - } - - int deleteIndex = -1; + LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(), + snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), obj); + if (snapshotList == null) { + snapshotList = new ArrayList<>(); + snapshots.put(snapshotMetadata.persistenceId(), snapshotList); + } synchronized (snapshotList) { - for(int i=0;i snapshotList = snapshots.get(persistentId); - - if(snapshotList == null){ - return; + public Future doDeleteAsync(final SnapshotMetadata metadata) { + List snapshotList = snapshots.get(metadata.persistenceId()); + + if (snapshotList != null) { + synchronized (snapshotList) { + for (int i = 0; i < snapshotList.size(); i++) { + StoredSnapshot snapshot = snapshotList.get(i); + if (metadata.equals(snapshot.metadata)) { + snapshotList.remove(i); + break; + } + } + } } - synchronized (snapshotList) { - Iterator iter = snapshotList.iterator(); - while(iter.hasNext()) { - StoredSnapshot s = iter.next(); - LOG.trace("doDelete: sequenceNr: {}, maxSequenceNr: {}", s.getMetadata().sequenceNr(), - snapshotSelectionCriteria.maxSequenceNr()); - - if(s.getMetadata().sequenceNr() <= snapshotSelectionCriteria.maxSequenceNr()) { - iter.remove(); + return Futures.successful(null); + } + + @Override + public Future doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) { + LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId, + criteria.maxSequenceNr(), criteria.maxTimestamp()); + + List snapshotList = snapshots.get(persistenceId); + if (snapshotList != null) { + synchronized (snapshotList) { + Iterator iter = snapshotList.iterator(); + while (iter.hasNext()) { + StoredSnapshot stored = iter.next(); + if (matches(stored, criteria)) { + LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}", + stored.metadata.sequenceNr(), stored.metadata.timestamp(), stored.data); + + iter.remove(); + } } } } + + CountDownLatch latch = SNAPSHOT_DELETED_LATCHES.get(persistenceId); + if (latch != null) { + latch.countDown(); + } + + return Futures.successful(null); } - private static class StoredSnapshot { + private static final class StoredSnapshot { private final SnapshotMetadata metadata; private final Object data; - private StoredSnapshot(SnapshotMetadata metadata, Object data) { + StoredSnapshot(final SnapshotMetadata metadata, final Object data) { this.metadata = metadata; this.data = data; } - - public SnapshotMetadata getMetadata() { - return metadata; - } - - public Object getData() { - return data; - } } }