*/
package org.opendaylight.controller.cluster.datastore.utils;
+import static org.junit.Assert.assertEquals;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.Future;
import akka.dispatch.Futures;
import akka.japi.Procedure;
public class InMemoryJournal extends AsyncWriteJournal {
- private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+ private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+ private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+
+ private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
Map<Long, Object> journal = journals.get(persistenceId);
journals.put(persistenceId, journal);
}
- journal.put(sequenceNr, data);
+ synchronized (journal) {
+ journal.put(sequenceNr, data);
+ }
}
public static void clear() {
journals.clear();
}
+ public static Map<Long, Object> get(String persistenceId) {
+ Map<Long, Object> journal = journals.get(persistenceId);
+ return journal != null ? journal : Collections.<Long, Object>emptyMap();
+ }
+
+ public static void waitForDeleteMessagesComplete(String persistenceId) {
+ assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
+ deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+ }
+
+ public static void addDeleteMessagesCompleteLatch(String persistenceId) {
+ deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
+ }
+
+ public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+ blockReadMessagesLatches.put(persistenceId, latch);
+ }
+
@Override
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
+ CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
+ if(blockLatch != null) {
+ Uninterruptibles.awaitUninterruptibly(blockLatch);
+ }
+
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
return null;
}
- for (Map.Entry<Long,Object> entry : journal.entrySet()) {
- PersistentRepr persistentMessage =
- new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
- replayCallback.apply(persistentMessage);
+ synchronized (journal) {
+ for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+ false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
}
return null;
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
- return Futures.successful(new Long(0));
+ return Futures.successful(-1L);
}
@Override
- public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
- return Futures.successful(null);
+ public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (PersistentRepr repr : messages) {
+ Map<Long, Object> journal = journals.get(repr.persistenceId());
+ if(journal == null) {
+ journal = Maps.newLinkedHashMap();
+ journals.put(repr.persistenceId(), journal);
+ }
+
+ synchronized (journal) {
+ journal.put(repr.sequenceNr(), repr.payload());
+ }
+ }
+ return null;
+ }
+ }, context().dispatcher());
}
@Override
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+ Map<Long, Object> journal = journals.get(persistenceId);
+ if(journal != null) {
+ synchronized (journal) {
+ Iterator<Long> iter = journal.keySet().iterator();
+ while(iter.hasNext()) {
+ Long n = iter.next();
+ if(n <= toSequenceNr) {
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId);
+ if(latch != null) {
+ latch.countDown();
+ }
+
return Futures.successful(null);
}
}