private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> snapshotDeletedLatches = new ConcurrentHashMap<>();
public static void addSnapshot(String persistentId, Object snapshot) {
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
}
+ public static void addSnapshotDeletedLatch(String persistenceId) {
+ snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1));
+ }
+
public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not saved");
return getSnapshots(persistenceId, type).get(0);
}
+ public static void waitForDeletedSnapshot(String persistenceId) {
+ if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ throw new AssertionError("Snapshot was not deleted");
+ }
+ }
+
@Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
SnapshotSelectionCriteria snapshotSelectionCriteria) {
public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+ LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(),
+ snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o);
+
if(snapshotList == null){
snapshotList = new ArrayList<>();
snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
- if(snapshotList == null){
- return;
- }
-
- synchronized (snapshotList) {
- Iterator<StoredSnapshot> iter = snapshotList.iterator();
- while(iter.hasNext()) {
- StoredSnapshot s = iter.next();
- if(matches(s, snapshotSelectionCriteria)) {
- LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
- s.metadata.sequenceNr(), s.metadata.timestamp());
-
- iter.remove();
+ if(snapshotList != null){
+ synchronized (snapshotList) {
+ Iterator<StoredSnapshot> iter = snapshotList.iterator();
+ while(iter.hasNext()) {
+ StoredSnapshot s = iter.next();
+ if(matches(s, snapshotSelectionCriteria)) {
+ LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
+ s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
+
+ iter.remove();
+ }
}
}
}
+
+ CountDownLatch latch = snapshotDeletedLatches.get(persistentId);
+ if(latch != null) {
+ latch.countDown();
+ }
}
private static class StoredSnapshot {