X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemoryJournal.java;h=8c6cf0945713ed0cbae3d5f7fbb2870220985dc5;hp=d482e28401dd3ef0edf87eebb1a9ca83eafbeb7f;hb=fa96da71c5ab10973a9f93c2e8b35494b96fd7ed;hpb=f9a9cd1ea40d2477ccb16b03c71a87595226595a diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index d482e28401..8c6cf09457 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -8,23 +8,24 @@ package org.opendaylight.controller.cluster.raft.utils; import akka.dispatch.Futures; -import akka.japi.Procedure; -import akka.persistence.PersistentConfirmation; -import akka.persistence.PersistentId; +import akka.persistence.AtomicWrite; import akka.persistence.PersistentImpl; import akka.persistence.PersistentRepr; import akka.persistence.journal.japi.AsyncWriteJournal; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.commons.lang.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -36,44 +37,60 @@ import scala.concurrent.Future; */ public class InMemoryJournal extends AsyncWriteJournal { + private static class WriteMessagesComplete { + final CountDownLatch latch; + final Class ofType; + + WriteMessagesComplete(int count, Class ofType) { + this.latch = new CountDownLatch(count); + this.ofType = ofType; + } + } + static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class); - private static final Map> journals = new ConcurrentHashMap<>(); + private static final Map> JOURNALS = new ConcurrentHashMap<>(); - private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map DELETE_MESSAGES_COMPLETE_LATCHES = new ConcurrentHashMap<>(); - private static final Map writeMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map WRITE_MESSAGES_COMPLETE = new ConcurrentHashMap<>(); - private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); + private static final Map BLOCK_READ_MESSAGES_LATCHES = new ConcurrentHashMap<>(); + + private static Object deserialize(Object data) { + return data instanceof byte[] ? SerializationUtils.deserialize((byte[])data) : data; + } public static void addEntry(String persistenceId, long sequenceNr, Object data) { - Map journal = journals.get(persistenceId); - if(journal == null) { + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { journal = Maps.newLinkedHashMap(); - journals.put(persistenceId, journal); + JOURNALS.put(persistenceId, journal); } synchronized (journal) { - journal.put(sequenceNr, data); + journal.put(sequenceNr, data instanceof Serializable + ? SerializationUtils.serialize((Serializable) data) : data); } } public static void clear() { - journals.clear(); + JOURNALS.clear(); } @SuppressWarnings("unchecked") public static List get(String persistenceId, Class type) { - Map journalMap = journals.get(persistenceId); - if(journalMap == null) { + Map journalMap = JOURNALS.get(persistenceId); + if (journalMap == null) { return Collections.emptyList(); } synchronized (journalMap) { List journal = new ArrayList<>(journalMap.size()); - for(Object entry: journalMap.values()) { - if(type.isInstance(entry)) { - journal.add((T) entry); + for (Object entry: journalMap.values()) { + Object data = deserialize(entry); + if (type.isInstance(data)) { + journal.add((T) data); } } @@ -82,17 +99,17 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static Map get(String persistenceId) { - Map journalMap = journals.get(persistenceId); + Map journalMap = JOURNALS.get(persistenceId); return journalMap != null ? journalMap : Collections.emptyMap(); } public static void dumpJournal(String persistenceId) { StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId)); - Map journalMap = journals.get(persistenceId); - if(journalMap != null) { + Map journalMap = JOURNALS.get(persistenceId); + if (journalMap != null) { synchronized (journalMap) { - for(Map.Entry e: journalMap.entrySet()) { - builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue()); + for (Map.Entry e: journalMap.entrySet()) { + builder.append("\n ").append(e.getKey()).append(" = ").append(deserialize(e.getValue())); } } } @@ -101,72 +118,81 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void waitForDeleteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if (!Uninterruptibles.awaitUninterruptibly(DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId), + 5, TimeUnit.SECONDS)) { throw new AssertionError("Delete messages did not complete"); } } public static void waitForWriteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if (!Uninterruptibles.awaitUninterruptibly(WRITE_MESSAGES_COMPLETE.get(persistenceId).latch, + 5, TimeUnit.SECONDS)) { throw new AssertionError("Journal write messages did not complete"); } } public static void addDeleteMessagesCompleteLatch(String persistenceId) { - deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1)); + DELETE_MESSAGES_COMPLETE_LATCHES.put(persistenceId, new CountDownLatch(1)); } public static void addWriteMessagesCompleteLatch(String persistenceId, int count) { - writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count)); + WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, null)); + } + + public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class ofType) { + WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, ofType)); } public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { - blockReadMessagesLatches.put(persistenceId, latch); + BLOCK_READ_MESSAGES_LATCHES.put(persistenceId, latch); } @Override - public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, - long toSequenceNr, long max, final Procedure replayCallback) { - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId); - if(blockLatch != null) { - Uninterruptibles.awaitUninterruptibly(blockLatch); - } + public Future doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr, + final long toSequenceNr, final long max, final Consumer replayCallback) { + LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId, + fromSequenceNr,toSequenceNr); + return Futures.future(() -> { + CountDownLatch blockLatch = BLOCK_READ_MESSAGES_LATCHES.remove(persistenceId); + if (blockLatch != null) { + Uninterruptibles.awaitUninterruptibly(blockLatch); + } - Map journal = journals.get(persistenceId); - if(journal == null) { - return null; - } + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { + return null; + } - synchronized (journal) { - for (Map.Entry entry : journal.entrySet()) { + synchronized (journal) { + int count = 0; + for (Map.Entry entry : journal.entrySet()) { + if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) { PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); - replayCallback.apply(persistentMessage); + new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId, + null, false, null, null); + replayCallback.accept(persistentMessage); } } - - return null; } + + return null; }, context().dispatcher()); } @Override public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { - // Akka calls this during recovery. + LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr); - Map journal = journals.get(persistenceId); - if(journal == null) { - return Futures.successful(-1L); + // Akka calls this during recovery. + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { + return Futures.successful(fromSequenceNr); } synchronized (journal) { long highest = -1; for (Long seqNr : journal.keySet()) { - if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) { + if (seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) { highest = seqNr.longValue(); } } @@ -176,62 +202,49 @@ public class InMemoryJournal extends AsyncWriteJournal { } @Override - public Future doAsyncWriteMessages(final Iterable messages) { - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - for (PersistentRepr repr : messages) { - Map journal = journals.get(repr.persistenceId()); - if(journal == null) { - journal = Maps.newLinkedHashMap(); - journals.put(repr.persistenceId(), journal); - } - - synchronized (journal) { - LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(), - repr.sequenceNr(), repr.payload()); - journal.put(repr.sequenceNr(), repr.payload()); - } - - CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId()); - if(latch != null) { - latch.countDown(); + public Future>> doAsyncWriteMessages(final Iterable messages) { + return Futures.future(() -> { + for (AtomicWrite write : messages) { + // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc + PersistentRepr[] array = new PersistentRepr[write.payload().size()]; + write.payload().copyToArray(array); + for (PersistentRepr repr: array) { + LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(), + repr.sequenceNr(), repr.payload()); + + addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload()); + + WriteMessagesComplete complete = WRITE_MESSAGES_COMPLETE.get(repr.persistenceId()); + if (complete != null) { + if (complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { + complete.latch.countDown(); + } } } - - return null; } - }, context().dispatcher()); - } - - @Override - public Future doAsyncWriteConfirmations(Iterable confirmations) { - return Futures.successful(null); - } - @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { - return Futures.successful(null); + return Collections.emptyList(); + }, context().dispatcher()); } @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr); - Map journal = journals.get(persistenceId); - if(journal != null) { + Map journal = JOURNALS.get(persistenceId); + if (journal != null) { synchronized (journal) { Iterator iter = journal.keySet().iterator(); - while(iter.hasNext()) { - Long n = iter.next(); - if(n <= toSequenceNr) { + while (iter.hasNext()) { + Long num = iter.next(); + if (num <= toSequenceNr) { iter.remove(); } } } } - CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId); - if(latch != null) { + CountDownLatch latch = DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId); + if (latch != null) { latch.countDown(); }