2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.utils;
10 import akka.dispatch.Futures;
11 import akka.japi.Procedure;
12 import akka.persistence.PersistentConfirmation;
13 import akka.persistence.PersistentId;
14 import akka.persistence.PersistentImpl;
15 import akka.persistence.PersistentRepr;
16 import akka.persistence.journal.japi.AsyncWriteJournal;
17 import com.google.common.collect.Maps;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
33 * An akka AsyncWriteJournal implementation that stores data in memory. This is intended for testing.
35 * @author Thomas Pantelis
37 public class InMemoryJournal extends AsyncWriteJournal {
39 static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class);
41 private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
43 private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
45 private static final Map<String, CountDownLatch> writeMessagesCompleteLatches = new ConcurrentHashMap<>();
47 private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
49 public static void addEntry(String persistenceId, long sequenceNr, Object data) {
50 Map<Long, Object> journal = journals.get(persistenceId);
52 journal = Maps.newLinkedHashMap();
53 journals.put(persistenceId, journal);
56 synchronized (journal) {
57 journal.put(sequenceNr, data);
61 public static void clear() {
65 @SuppressWarnings("unchecked")
66 public static <T> List<T> get(String persistenceId, Class<T> type) {
67 Map<Long, Object> journalMap = journals.get(persistenceId);
68 if(journalMap == null) {
69 return Collections.<T>emptyList();
72 synchronized (journalMap) {
73 List<T> journal = new ArrayList<>(journalMap.size());
74 for(Object entry: journalMap.values()) {
75 if(type.isInstance(entry)) {
76 journal.add((T) entry);
84 public static Map<Long, Object> get(String persistenceId) {
85 Map<Long, Object> journalMap = journals.get(persistenceId);
86 return journalMap != null ? journalMap : Collections.<Long, Object>emptyMap();
89 public static void dumpJournal(String persistenceId) {
90 StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId));
91 Map<Long, Object> journalMap = journals.get(persistenceId);
92 if(journalMap != null) {
93 synchronized (journalMap) {
94 for(Map.Entry<Long, Object> e: journalMap.entrySet()) {
95 builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue());
100 LOG.info(builder.toString());
103 public static void waitForDeleteMessagesComplete(String persistenceId) {
104 if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
105 throw new AssertionError("Delete messages did not complete");
109 public static void waitForWriteMessagesComplete(String persistenceId) {
110 if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
111 throw new AssertionError("Journal write messages did not complete");
115 public static void addDeleteMessagesCompleteLatch(String persistenceId) {
116 deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
119 public static void addWriteMessagesCompleteLatch(String persistenceId, int count) {
120 writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count));
123 public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
124 blockReadMessagesLatches.put(persistenceId, latch);
128 public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
129 long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
130 return Futures.future(new Callable<Void>() {
132 public Void call() throws Exception {
133 CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
134 if(blockLatch != null) {
135 Uninterruptibles.awaitUninterruptibly(blockLatch);
138 Map<Long, Object> journal = journals.get(persistenceId);
139 if(journal == null) {
143 synchronized (journal) {
144 for (Map.Entry<Long,Object> entry : journal.entrySet()) {
145 PersistentRepr persistentMessage =
146 new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
148 replayCallback.apply(persistentMessage);
154 }, context().dispatcher());
158 public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
159 // Akka calls this during recovery.
161 Map<Long, Object> journal = journals.get(persistenceId);
162 if(journal == null) {
163 return Futures.successful(-1L);
166 synchronized (journal) {
168 for (Long seqNr : journal.keySet()) {
169 if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) {
170 highest = seqNr.longValue();
174 return Futures.successful(highest);
179 public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
180 return Futures.future(new Callable<Void>() {
182 public Void call() throws Exception {
183 for (PersistentRepr repr : messages) {
184 Map<Long, Object> journal = journals.get(repr.persistenceId());
185 if(journal == null) {
186 journal = Maps.newLinkedHashMap();
187 journals.put(repr.persistenceId(), journal);
190 synchronized (journal) {
191 LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
192 repr.sequenceNr(), repr.payload());
193 journal.put(repr.sequenceNr(), repr.payload());
196 CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId());
204 }, context().dispatcher());
208 public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
209 return Futures.successful(null);
213 public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
214 return Futures.successful(null);
218 public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
219 Map<Long, Object> journal = journals.get(persistenceId);
220 if(journal != null) {
221 synchronized (journal) {
222 Iterator<Long> iter = journal.keySet().iterator();
223 while(iter.hasNext()) {
224 Long n = iter.next();
225 if(n <= toSequenceNr) {
232 CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId);
237 return Futures.successful(null);