Bug 2055: Handle Tx create in TransactionProxy resiliently
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / InMemoryJournal.java
index 34867530821d70d4ce7085859a3f79e59a6d9a03..f340d1c305c5939993d09de8a4e84ef630791b1f 100644 (file)
@@ -32,6 +32,8 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
 
+    private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
+
     public static void addEntry(String persistenceId, long sequenceNr, Object data) {
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal == null) {
@@ -62,12 +64,21 @@ public class InMemoryJournal extends AsyncWriteJournal {
         deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
     }
 
+    public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+        blockReadMessagesLatches.put(persistenceId, latch);
+    }
+
     @Override
     public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
             long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
         return Futures.future(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
+                CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
+                if(blockLatch != null) {
+                    Uninterruptibles.awaitUninterruptibly(blockLatch);
+                }
+
                 Map<Long, Object> journal = journals.get(persistenceId);
                 if(journal == null) {
                     return null;