BUG 2718 : Create a diagnostic utility to track append entries replies
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / InMemoryJournal.java
index c9a0eaf0337929408ec4a16d7c6479a4cf329f69..f340d1c305c5939993d09de8a4e84ef630791b1f 100644 (file)
@@ -7,10 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
 import scala.concurrent.Future;
 import akka.dispatch.Futures;
 import akka.japi.Procedure;
@@ -22,7 +28,11 @@ import akka.persistence.journal.japi.AsyncWriteJournal;
 
 public class InMemoryJournal extends AsyncWriteJournal {
 
-    private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+    private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+    private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+
+    private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
 
     public static void addEntry(String persistenceId, long sequenceNr, Object data) {
         Map<Long, Object> journal = journals.get(persistenceId);
@@ -31,28 +41,56 @@ public class InMemoryJournal extends AsyncWriteJournal {
             journals.put(persistenceId, journal);
         }
 
-        journal.put(sequenceNr, data);
+        synchronized (journal) {
+            journal.put(sequenceNr, data);
+        }
     }
 
     public static void clear() {
         journals.clear();
     }
 
+    public static Map<Long, Object> get(String persistenceId) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        return journal != null ? journal : Collections.<Long, Object>emptyMap();
+    }
+
+    public static void waitForDeleteMessagesComplete(String persistenceId) {
+        assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
+                deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+    }
+
+    public static void addDeleteMessagesCompleteLatch(String persistenceId) {
+        deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
+    }
+
+    public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+        blockReadMessagesLatches.put(persistenceId, latch);
+    }
+
     @Override
     public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
             long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
         return Futures.future(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
+                CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
+                if(blockLatch != null) {
+                    Uninterruptibles.awaitUninterruptibly(blockLatch);
+                }
+
                 Map<Long, Object> journal = journals.get(persistenceId);
                 if(journal == null) {
                     return null;
                 }
 
-                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
-                    PersistentRepr persistentMessage =
-                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
-                    replayCallback.apply(persistentMessage);
+                synchronized (journal) {
+                    for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                        PersistentRepr persistentMessage =
+                                new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                        false, null, null);
+                        replayCallback.apply(persistentMessage);
+                    }
                 }
 
                 return null;
@@ -62,12 +100,28 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
-        return Futures.successful(new Long(0));
+        return Futures.successful(-1L);
     }
 
     @Override
-    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
-        return Futures.successful(null);
+    public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                for (PersistentRepr repr : messages) {
+                    Map<Long, Object> journal = journals.get(repr.persistenceId());
+                    if(journal == null) {
+                        journal = Maps.newLinkedHashMap();
+                        journals.put(repr.persistenceId(), journal);
+                    }
+
+                    synchronized (journal) {
+                        journal.put(repr.sequenceNr(), repr.payload());
+                    }
+                }
+                return null;
+            }
+        }, context().dispatcher());
     }
 
     @Override
@@ -82,6 +136,24 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal != null) {
+            synchronized (journal) {
+                Iterator<Long> iter = journal.keySet().iterator();
+                while(iter.hasNext()) {
+                    Long n = iter.next();
+                    if(n <= toSequenceNr) {
+                        iter.remove();
+                    }
+                }
+            }
+        }
+
+        CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId);
+        if(latch != null) {
+            latch.countDown();
+        }
+
         return Futures.successful(null);
     }
 }