import akka.persistence.PersistentImpl;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
}
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
- Map<Long, Object> journal = JOURNALS.get(persistenceId);
- if (journal == null) {
- journal = Maps.newLinkedHashMap();
- JOURNALS.put(persistenceId, journal);
- }
+ Map<Long, Object> journal = JOURNALS.computeIfAbsent(persistenceId, k -> new LinkedHashMap<>());
synchronized (journal) {
journal.put(sequenceNr, data instanceof Serializable
public static void clear() {
JOURNALS.clear();
+ DELETE_MESSAGES_COMPLETE_LATCHES.clear();
+ WRITE_MESSAGES_COMPLETE.clear();
+ BLOCK_READ_MESSAGES_LATCHES.clear();
}
@SuppressWarnings("unchecked")
Map<Long, Object> journal = JOURNALS.get(persistenceId);
if (journal != null) {
synchronized (journal) {
- Iterator<Long> iter = journal.keySet().iterator();
- while (iter.hasNext()) {
- Long num = iter.next();
- if (num <= toSequenceNr) {
- iter.remove();
- }
- }
+ journal.keySet().removeIf(num -> num <= toSequenceNr);
}
}