X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FInMemoryJournal.java;h=f340d1c305c5939993d09de8a4e84ef630791b1f;hb=1e25f44b70aab9998f33b2136a0cf628821cd2dc;hp=c9a0eaf0337929408ec4a16d7c6479a4cf329f69;hpb=b3e553ce5b3d3e972cbe19465ab7af2fcb39934c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java index c9a0eaf033..f340d1c305 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java @@ -7,10 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Uninterruptibles; import scala.concurrent.Future; import akka.dispatch.Futures; import akka.japi.Procedure; @@ -22,7 +28,11 @@ import akka.persistence.journal.japi.AsyncWriteJournal; public class InMemoryJournal extends AsyncWriteJournal { - private static Map> journals = new ConcurrentHashMap<>(); + private static final Map> journals = new ConcurrentHashMap<>(); + + private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); + + private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); public static void addEntry(String persistenceId, long sequenceNr, Object data) { Map journal = journals.get(persistenceId); @@ -31,28 +41,56 @@ public class InMemoryJournal extends AsyncWriteJournal { journals.put(persistenceId, journal); } - journal.put(sequenceNr, data); + synchronized (journal) { + journal.put(sequenceNr, data); + } } public static void clear() { journals.clear(); } + public static Map get(String persistenceId) { + Map journal = journals.get(persistenceId); + return journal != null ? journal : Collections.emptyMap(); + } + + public static void waitForDeleteMessagesComplete(String persistenceId) { + assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly( + deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)); + } + + public static void addDeleteMessagesCompleteLatch(String persistenceId) { + deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1)); + } + + public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { + blockReadMessagesLatches.put(persistenceId, latch); + } + @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, final Procedure replayCallback) { return Futures.future(new Callable() { @Override public Void call() throws Exception { + CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId); + if(blockLatch != null) { + Uninterruptibles.awaitUninterruptibly(blockLatch); + } + Map journal = journals.get(persistenceId); if(journal == null) { return null; } - for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null); - replayCallback.apply(persistentMessage); + synchronized (journal) { + for (Map.Entry entry : journal.entrySet()) { + PersistentRepr persistentMessage = + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, + false, null, null); + replayCallback.apply(persistentMessage); + } } return null; @@ -62,12 +100,28 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { - return Futures.successful(new Long(0)); + return Futures.successful(-1L); } @Override - public Future doAsyncWriteMessages(Iterable messages) { - return Futures.successful(null); + public Future doAsyncWriteMessages(final Iterable messages) { + return Futures.future(new Callable() { + @Override + public Void call() throws Exception { + for (PersistentRepr repr : messages) { + Map journal = journals.get(repr.persistenceId()); + if(journal == null) { + journal = Maps.newLinkedHashMap(); + journals.put(repr.persistenceId(), journal); + } + + synchronized (journal) { + journal.put(repr.sequenceNr(), repr.payload()); + } + } + return null; + } + }, context().dispatcher()); } @Override @@ -82,6 +136,24 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + Map journal = journals.get(persistenceId); + if(journal != null) { + synchronized (journal) { + Iterator iter = journal.keySet().iterator(); + while(iter.hasNext()) { + Long n = iter.next(); + if(n <= toSequenceNr) { + iter.remove(); + } + } + } + } + + CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId); + if(latch != null) { + latch.countDown(); + } + return Futures.successful(null); } }