X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemoryJournal.java;h=0e2449e4b8e62d7d8cd4af3d4bd33407136460e2;hp=d482e28401dd3ef0edf87eebb1a9ca83eafbeb7f;hb=2f7c93174d7834a4c4aedacc9b88aa53a5a0422c;hpb=864f3c5bb156a7f6f6e2d91a6c7d43e916909e84 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index d482e28401..0e2449e4b8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -36,13 +36,23 @@ import scala.concurrent.Future; */ public class InMemoryJournal extends AsyncWriteJournal { + private static class WriteMessagesComplete { + final CountDownLatch latch; + final Class ofType; + + public WriteMessagesComplete(int count, Class ofType) { + this.latch = new CountDownLatch(count); + this.ofType = ofType; + } + } + static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class); private static final Map> journals = new ConcurrentHashMap<>(); private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); - private static final Map writeMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map writeMessagesComplete = new ConcurrentHashMap<>(); private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); @@ -107,7 +117,7 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void waitForWriteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) { throw new AssertionError("Journal write messages did not complete"); } } @@ -117,7 +127,11 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void addWriteMessagesCompleteLatch(String persistenceId, int count) { - writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count)); + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null)); + } + + public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class ofType) { + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType)); } public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { @@ -193,9 +207,11 @@ public class InMemoryJournal extends AsyncWriteJournal { journal.put(repr.sequenceNr(), repr.payload()); } - CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId()); - if(latch != null) { - latch.countDown(); + WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId()); + if(complete != null) { + if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { + complete.latch.countDown(); + } } }