X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FInMemoryJournal.java;h=f2a216c821d43098ad3c43454a1111439c818756;hp=790952883d1b83ea02e2335d6f029454c86f063a;hb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;hpb=8cc33e526b0f9ad956dcc96e57cff02679d643b3 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 790952883d..f2a216c821 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -42,7 +41,7 @@ public class InMemoryJournal extends AsyncWriteJournal { final CountDownLatch latch; final Class ofType; - public WriteMessagesComplete(int count, Class ofType) { + WriteMessagesComplete(int count, Class ofType) { this.latch = new CountDownLatch(count); this.ofType = ofType; } @@ -50,47 +49,47 @@ public class InMemoryJournal extends AsyncWriteJournal { static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class); - private static final Map> journals = new ConcurrentHashMap<>(); + private static final Map> JOURNALS = new ConcurrentHashMap<>(); - private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map DELETE_MESSAGES_COMPLETE_LATCHES = new ConcurrentHashMap<>(); - private static final Map writeMessagesComplete = new ConcurrentHashMap<>(); + private static final Map WRITE_MESSAGES_COMPLETE = new ConcurrentHashMap<>(); - private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); + private static final Map BLOCK_READ_MESSAGES_LATCHES = new ConcurrentHashMap<>(); private static Object deserialize(Object data) { return data instanceof byte[] ? SerializationUtils.deserialize((byte[])data) : data; } public static void addEntry(String persistenceId, long sequenceNr, Object data) { - Map journal = journals.get(persistenceId); - if(journal == null) { + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { journal = Maps.newLinkedHashMap(); - journals.put(persistenceId, journal); + JOURNALS.put(persistenceId, journal); } synchronized (journal) { - journal.put(sequenceNr, data instanceof Serializable ? - SerializationUtils.serialize((Serializable) data) : data); + journal.put(sequenceNr, data instanceof Serializable + ? SerializationUtils.serialize((Serializable) data) : data); } } public static void clear() { - journals.clear(); + JOURNALS.clear(); } @SuppressWarnings("unchecked") public static List get(String persistenceId, Class type) { - Map journalMap = journals.get(persistenceId); - if(journalMap == null) { + Map journalMap = JOURNALS.get(persistenceId); + if (journalMap == null) { return Collections.emptyList(); } synchronized (journalMap) { List journal = new ArrayList<>(journalMap.size()); - for(Object entry: journalMap.values()) { + for (Object entry: journalMap.values()) { Object data = deserialize(entry); - if(type.isInstance(data)) { + if (type.isInstance(data)) { journal.add((T) data); } } @@ -100,16 +99,16 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static Map get(String persistenceId) { - Map journalMap = journals.get(persistenceId); + Map journalMap = JOURNALS.get(persistenceId); return journalMap != null ? journalMap : Collections.emptyMap(); } public static void dumpJournal(String persistenceId) { StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId)); - Map journalMap = journals.get(persistenceId); - if(journalMap != null) { + Map journalMap = JOURNALS.get(persistenceId); + if (journalMap != null) { synchronized (journalMap) { - for(Map.Entry e: journalMap.entrySet()) { + for (Map.Entry e: journalMap.entrySet()) { builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue()); } } @@ -119,31 +118,33 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void waitForDeleteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if (!Uninterruptibles.awaitUninterruptibly(DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId), + 5, TimeUnit.SECONDS)) { throw new AssertionError("Delete messages did not complete"); } } public static void waitForWriteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) { + if (!Uninterruptibles.awaitUninterruptibly(WRITE_MESSAGES_COMPLETE.get(persistenceId).latch, + 5, TimeUnit.SECONDS)) { throw new AssertionError("Journal write messages did not complete"); } } public static void addDeleteMessagesCompleteLatch(String persistenceId) { - deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1)); + DELETE_MESSAGES_COMPLETE_LATCHES.put(persistenceId, new CountDownLatch(1)); } public static void addWriteMessagesCompleteLatch(String persistenceId, int count) { - writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null)); + WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, null)); } public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class ofType) { - writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType)); + WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, ofType)); } public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { - blockReadMessagesLatches.put(persistenceId, latch); + BLOCK_READ_MESSAGES_LATCHES.put(persistenceId, latch); } @Override @@ -151,33 +152,30 @@ public class InMemoryJournal extends AsyncWriteJournal { final long toSequenceNr, final long max, final Consumer replayCallback) { LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId, fromSequenceNr,toSequenceNr); - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId); - if(blockLatch != null) { - Uninterruptibles.awaitUninterruptibly(blockLatch); - } + return Futures.future(() -> { + CountDownLatch blockLatch = BLOCK_READ_MESSAGES_LATCHES.remove(persistenceId); + if (blockLatch != null) { + Uninterruptibles.awaitUninterruptibly(blockLatch); + } - Map journal = journals.get(persistenceId); - if (journal == null) { - return null; - } + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { + return null; + } - synchronized (journal) { - int count = 0; - for (Map.Entry entry : journal.entrySet()) { - if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) { - PersistentRepr persistentMessage = - new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId, - null, false, null, null); - replayCallback.accept(persistentMessage); - } + synchronized (journal) { + int count = 0; + for (Map.Entry entry : journal.entrySet()) { + if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) { + PersistentRepr persistentMessage = + new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId, + null, false, null, null); + replayCallback.accept(persistentMessage); } } - - return null; } + + return null; }, context().dispatcher()); } @@ -186,15 +184,15 @@ public class InMemoryJournal extends AsyncWriteJournal { LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr); // Akka calls this during recovery. - Map journal = journals.get(persistenceId); - if(journal == null) { + Map journal = JOURNALS.get(persistenceId); + if (journal == null) { return Futures.successful(fromSequenceNr); } synchronized (journal) { long highest = -1; for (Long seqNr : journal.keySet()) { - if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) { + if (seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) { highest = seqNr.longValue(); } } @@ -205,51 +203,48 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future>> doAsyncWriteMessages(final Iterable messages) { - return Futures.future(new Callable>>() { - @Override - public Iterable> call() throws Exception { - for (AtomicWrite write : messages) { - // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc - PersistentRepr[] array = new PersistentRepr[write.payload().size()]; - write.payload().copyToArray(array); - for(PersistentRepr repr: array) { - LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(), - repr.sequenceNr(), repr.payload()); - - addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload()); - - WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId()); - if(complete != null) { - if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { - complete.latch.countDown(); - } + return Futures.future(() -> { + for (AtomicWrite write : messages) { + // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc + PersistentRepr[] array = new PersistentRepr[write.payload().size()]; + write.payload().copyToArray(array); + for (PersistentRepr repr: array) { + LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(), + repr.sequenceNr(), repr.payload()); + + addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload()); + + WriteMessagesComplete complete = WRITE_MESSAGES_COMPLETE.get(repr.persistenceId()); + if (complete != null) { + if (complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { + complete.latch.countDown(); } } } - - return Collections.emptyList(); } + + return Collections.emptyList(); }, context().dispatcher()); } @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr); - Map journal = journals.get(persistenceId); - if(journal != null) { + Map journal = JOURNALS.get(persistenceId); + if (journal != null) { synchronized (journal) { Iterator iter = journal.keySet().iterator(); - while(iter.hasNext()) { - Long n = iter.next(); - if(n <= toSequenceNr) { + while (iter.hasNext()) { + Long num = iter.next(); + if (num <= toSequenceNr) { iter.remove(); } } } } - CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId); - if(latch != null) { + CountDownLatch latch = DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId); + if (latch != null) { latch.countDown(); }