private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
+
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
}
+ public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+ blockReadMessagesLatches.put(persistenceId, latch);
+ }
+
@Override
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
+ CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
+ if(blockLatch != null) {
+ Uninterruptibles.awaitUninterruptibly(blockLatch);
+ }
+
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
return null;