X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemorySnapshotStore.java;h=a7e751c23e4de0abd8ab2b1b337c6261401e65b9;hb=refs%2Fchanges%2F24%2F32524%2F19;hp=01f337567560ba02f7849c8b920cd5f05b18f949;hpb=d36d8d28eaf7e4cc9ac0bd2972e11346819d4c3c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java index 01f3375675..a7e751c23e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java @@ -9,19 +9,21 @@ package org.opendaylight.controller.cluster.raft.utils; import akka.dispatch.Futures; -import akka.japi.Option; import akka.persistence.SelectedSnapshot; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -36,6 +38,8 @@ public class InMemorySnapshotStore extends SnapshotStore { static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class); private static Map> snapshots = new ConcurrentHashMap<>(); + private static final Map snapshotSavedLatches = new ConcurrentHashMap<>(); + private static final Map snapshotDeletedLatches = new ConcurrentHashMap<>(); public static void addSnapshot(String persistentId, Object snapshot) { List snapshotList = snapshots.get(persistentId); @@ -62,8 +66,8 @@ public class InMemorySnapshotStore extends SnapshotStore { synchronized (stored) { retList = Lists.newArrayListWithCapacity(stored.size()); for(StoredSnapshot s: stored) { - if(type.isInstance(s.getData())) { - retList.add((T) s.getData()); + if(type.isInstance(s.data)) { + retList.add((T) s.data); } } } @@ -75,24 +79,61 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshots.clear(); } + public static void addSnapshotSavedLatch(String persistenceId) { + snapshotSavedLatches.put(persistenceId, new CountDownLatch(1)); + } + + public static void addSnapshotDeletedLatch(String persistenceId) { + snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1)); + } + + public static T waitForSavedSnapshot(String persistenceId, Class type) { + if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not saved"); + } + + return getSnapshots(persistenceId, type).get(0); + } + + public static void waitForDeletedSnapshot(String persistenceId) { + if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not deleted"); + } + } + @Override - public Future> doLoadAsync(String s, - SnapshotSelectionCriteria snapshotSelectionCriteria) { - List snapshotList = snapshots.get(s); + public Future> doLoadAsync(String persistenceId, + SnapshotSelectionCriteria snapshotSelectionCriteria) { + List snapshotList = snapshots.get(persistenceId); if(snapshotList == null){ - return Futures.successful(Option.none()); + return Futures.successful(Optional.empty()); } - StoredSnapshot snapshot = Iterables.getLast(snapshotList); - SelectedSnapshot selectedSnapshot = - new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); - return Futures.successful(Option.some(selectedSnapshot)); + synchronized(snapshotList) { + for(int i = snapshotList.size() - 1; i >= 0; i--) { + StoredSnapshot snapshot = snapshotList.get(i); + if(matches(snapshot, snapshotSelectionCriteria)) { + return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata, + snapshot.data))); + } + } + } + + return Futures.successful(Optional.empty()); + } + + private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) { + return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() && + snapshot.metadata.timestamp() <= criteria.maxTimestamp(); } @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(), + snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o); + if(snapshotList == null){ snapshotList = new ArrayList<>(); snapshots.put(snapshotMetadata.persistenceId(), snapshotList); @@ -101,59 +142,60 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshotList.add(new StoredSnapshot(snapshotMetadata, o)); } - return Futures.successful(null); - } + CountDownLatch latch = snapshotSavedLatches.get(snapshotMetadata.persistenceId()); + if(latch != null) { + latch.countDown(); + } - @Override - public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + return Futures.successful(null); } @Override - public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { - List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); - - if(snapshotList == null){ - return; - } - - int deleteIndex = -1; - - synchronized (snapshotList) { - for(int i=0;i doDeleteAsync(SnapshotMetadata metadata) { + List snapshotList = snapshots.get(metadata.persistenceId()); + + if (snapshotList != null) { + synchronized (snapshotList) { + for(int i=0;i snapshotList = snapshots.get(persistentId); - - if(snapshotList == null){ - return; - } - - synchronized (snapshotList) { - Iterator iter = snapshotList.iterator(); - while(iter.hasNext()) { - StoredSnapshot s = iter.next(); - LOG.trace("doDelete: sequenceNr: {}, maxSequenceNr: {}", s.getMetadata().sequenceNr(), - snapshotSelectionCriteria.maxSequenceNr()); - - if(s.getMetadata().sequenceNr() <= snapshotSelectionCriteria.maxSequenceNr()) { - iter.remove(); + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId, + criteria.maxSequenceNr(), criteria.maxTimestamp()); + + List snapshotList = snapshots.get(persistenceId); + if(snapshotList != null){ + synchronized (snapshotList) { + Iterator iter = snapshotList.iterator(); + while(iter.hasNext()) { + StoredSnapshot s = iter.next(); + if(matches(s, criteria)) { + LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}", + s.metadata.sequenceNr(), s.metadata.timestamp(), s.data); + + iter.remove(); + } } } } + + CountDownLatch latch = snapshotDeletedLatches.get(persistenceId); + if(latch != null) { + latch.countDown(); + } + + return Futures.successful(null); } private static class StoredSnapshot { @@ -164,13 +206,5 @@ public class InMemorySnapshotStore extends SnapshotStore { this.metadata = metadata; this.data = data; } - - public SnapshotMetadata getMetadata() { - return metadata; - } - - public Object getData() { - return data; - } } }