X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemoryJournal.java;h=790952883d1b83ea02e2335d6f029454c86f063a;hb=refs%2Fchanges%2F24%2F32524%2F19;hp=0737d75a7f2d679ae240c996c87cbd882052da28;hpb=d36d8d28eaf7e4cc9ac0bd2972e11346819d4c3c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 0737d75a7f..790952883d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -8,23 +8,25 @@ package org.opendaylight.controller.cluster.raft.utils; import akka.dispatch.Futures; -import akka.japi.Procedure; -import akka.persistence.PersistentConfirmation; -import akka.persistence.PersistentId; +import akka.persistence.AtomicWrite; import akka.persistence.PersistentImpl; import akka.persistence.PersistentRepr; import akka.persistence.journal.japi.AsyncWriteJournal; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.commons.lang.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -36,16 +38,30 @@ import scala.concurrent.Future; */ public class InMemoryJournal extends AsyncWriteJournal { + private static class WriteMessagesComplete { + final CountDownLatch latch; + final Class ofType; + + public WriteMessagesComplete(int count, Class ofType) { + this.latch = new CountDownLatch(count); + this.ofType = ofType; + } + } + static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class); private static final Map> journals = new ConcurrentHashMap<>(); private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); - private static final Map writeMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map writeMessagesComplete = new ConcurrentHashMap<>(); private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); + private static Object deserialize(Object data) { + return data instanceof byte[] ? SerializationUtils.deserialize((byte[])data) : data; + } + public static void addEntry(String persistenceId, long sequenceNr, Object data) { Map journal = journals.get(persistenceId); if(journal == null) { @@ -54,7 +70,8 @@ public class InMemoryJournal extends AsyncWriteJournal { } synchronized (journal) { - journal.put(sequenceNr, data); + journal.put(sequenceNr, data instanceof Serializable ? + SerializationUtils.serialize((Serializable) data) : data); } } @@ -72,8 +89,9 @@ public class InMemoryJournal extends AsyncWriteJournal { synchronized (journalMap) { List journal = new ArrayList<>(journalMap.size()); for(Object entry: journalMap.values()) { - if(type.isInstance(entry)) { - journal.add((T) entry); + Object data = deserialize(entry); + if(type.isInstance(data)) { + journal.add((T) data); } } @@ -107,7 +125,7 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void waitForWriteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) { throw new AssertionError("Journal write messages did not complete"); } } @@ -117,7 +135,11 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void addWriteMessagesCompleteLatch(String persistenceId, int count) { - writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count)); + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null)); + } + + public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class ofType) { + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType)); } public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { @@ -125,8 +147,10 @@ public class InMemoryJournal extends AsyncWriteJournal { } @Override - public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, - long toSequenceNr, long max, final Procedure replayCallback) { + public Future doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr, + final long toSequenceNr, final long max, final Consumer replayCallback) { + LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId, + fromSequenceNr,toSequenceNr); return Futures.future(new Callable() { @Override public Void call() throws Exception { @@ -136,16 +160,19 @@ public class InMemoryJournal extends AsyncWriteJournal { } Map journal = journals.get(persistenceId); - if(journal == null) { + if (journal == null) { return null; } synchronized (journal) { + int count = 0; for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); - replayCallback.apply(persistentMessage); + if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) { + PersistentRepr persistentMessage = + new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId, + null, false, null, null); + replayCallback.accept(persistentMessage); + } } } @@ -156,11 +183,12 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { - // Akka calls this during recovery. + LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr); + // Akka calls this during recovery. Map journal = journals.get(persistenceId); if(journal == null) { - return Futures.successful(-1L); + return Futures.successful(fromSequenceNr); } synchronized (journal) { @@ -176,46 +204,37 @@ public class InMemoryJournal extends AsyncWriteJournal { } @Override - public Future doAsyncWriteMessages(final Iterable messages) { - return Futures.future(new Callable() { + public Future>> doAsyncWriteMessages(final Iterable messages) { + return Futures.future(new Callable>>() { @Override - public Void call() throws Exception { - for (PersistentRepr repr : messages) { - Map journal = journals.get(repr.persistenceId()); - if(journal == null) { - journal = Maps.newLinkedHashMap(); - journals.put(repr.persistenceId(), journal); - } - - synchronized (journal) { + public Iterable> call() throws Exception { + for (AtomicWrite write : messages) { + // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc + PersistentRepr[] array = new PersistentRepr[write.payload().size()]; + write.payload().copyToArray(array); + for(PersistentRepr repr: array) { LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(), - repr.sequenceNr(), repr.payload()); - journal.put(repr.sequenceNr(), repr.payload()); - } + repr.sequenceNr(), repr.payload()); + + addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload()); - CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId()); - if(latch != null) { - latch.countDown(); + WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId()); + if(complete != null) { + if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { + complete.latch.countDown(); + } + } } } - return null; + return Collections.emptyList(); } }, context().dispatcher()); } @Override - public Future doAsyncWriteConfirmations(Iterable confirmations) { - return Futures.successful(null); - } - - @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { - return Futures.successful(null); - } - - @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { + LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr); Map journal = journals.get(persistenceId); if(journal != null) { synchronized (journal) {