Add SegmentedFileJournal
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
1 /*
2  * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.AbstractActor;
15 import akka.actor.Props;
16 import akka.persistence.AtomicWrite;
17 import akka.persistence.PersistentRepr;
18 import com.codahale.metrics.Histogram;
19 import com.codahale.metrics.Meter;
20 import com.codahale.metrics.SlidingTimeWindowReservoir;
21 import com.codahale.metrics.Timer;
22 import com.google.common.base.MoreObjects;
23 import io.atomix.storage.StorageLevel;
24 import io.atomix.storage.journal.Indexed;
25 import io.atomix.storage.journal.SegmentedJournal;
26 import io.atomix.storage.journal.SegmentedJournalReader;
27 import io.atomix.storage.journal.SegmentedJournalWriter;
28 import io.atomix.utils.serializer.Namespace;
29 import java.io.File;
30 import java.io.Serializable;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Optional;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Consumer;
36 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
37 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.collection.Iterator;
41 import scala.collection.SeqLike;
42 import scala.concurrent.Future;
43 import scala.concurrent.Promise;
44
45 /**
46  * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
47  * <ul>
48  *     <li>A memory-mapped data journal, containing actual data entries</li>
49  *     <li>A simple file journal, containing sequence numbers of last deleted entry</li>
50  * </ul>
51  *
52  * <p>
53  * This is a conscious design decision to minimize the amount of data that is being stored in the data journal while
54  * speeding up normal operations. Since the SegmentedJournal is an append-only linear log and Akka requires the ability
55  * to delete persistence entries, we need ability to mark a subset of a SegmentedJournal as deleted. While we could
56  * treat such delete requests as normal events, this leads to a mismatch between SegmentedJournal indices (as exposed by
57  * {@link Indexed}) and Akka sequence numbers -- requiring us to potentially perform costly deserialization to find the
58  * index corresponding to a particular sequence number, or maintain moderately-complex logic and data structures to
59  * perform that mapping in sub-linear time complexity.
60  *
61  * <p>
62  * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
63  * mapping information. The only additional information we need to maintain is the last deleted sequence number.
64  *
65  * @author Robert Varga
66  */
67 final class SegmentedJournalActor extends AbstractActor {
68     abstract static class AsyncMessage<T> {
69         final Promise<T> promise = Promise.apply();
70     }
71
72     private static final class ReadHighestSequenceNr extends AsyncMessage<Long> {
73         private final long fromSequenceNr;
74
75         ReadHighestSequenceNr(final long fromSequenceNr) {
76             this.fromSequenceNr = fromSequenceNr;
77         }
78
79         @Override
80         public String toString() {
81             return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr).toString();
82         }
83     }
84
85     private static final class ReplayMessages extends AsyncMessage<Void> {
86         private final long fromSequenceNr;
87         private final long toSequenceNr;
88         private final long max;
89         private final Consumer<PersistentRepr> replayCallback;
90
91         ReplayMessages(final long fromSequenceNr,
92                 final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
93             this.fromSequenceNr = fromSequenceNr;
94             this.toSequenceNr = toSequenceNr;
95             this.max = max;
96             this.replayCallback = requireNonNull(replayCallback);
97         }
98
99         @Override
100         public String toString() {
101             return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr)
102                     .add("toSequenceNr", toSequenceNr).add("max", max).toString();
103         }
104     }
105
106     static final class WriteMessages {
107         private final List<AtomicWrite> requests = new ArrayList<>();
108         private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
109
110         Future<Optional<Exception>> add(final AtomicWrite write) {
111             final Promise<Optional<Exception>> promise = Promise.apply();
112             requests.add(write);
113             results.add(promise);
114             return promise.future();
115         }
116
117         @Override
118         public String toString() {
119             return MoreObjects.toStringHelper(this).add("requests", requests).toString();
120         }
121     }
122
123     private static final class DeleteMessagesTo extends AsyncMessage<Void> {
124         final long toSequenceNr;
125
126         DeleteMessagesTo(final long toSequenceNr) {
127             this.toSequenceNr = toSequenceNr;
128         }
129
130         @Override
131         public String toString() {
132             return MoreObjects.toStringHelper(this).add("toSequenceNr", toSequenceNr).toString();
133         }
134     }
135
136     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
137     private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
138     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
139
140     // Tracks the time it took us to write a batch of messages
141     private final Timer batchWriteTime = new Timer();
142     // Tracks the number of individual messages written
143     private final Meter messageWriteCount = new Meter();
144     // Tracks the size distribution of messages for last 5 minutes
145     private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
146
147     private final String persistenceId;
148     private final StorageLevel storage;
149     private final int maxSegmentSize;
150     private final int maxEntrySize;
151     private final File directory;
152
153     private SegmentedJournal<DataJournalEntry> dataJournal;
154     private SegmentedJournal<Long> deleteJournal;
155     private long lastDelete;
156
157     // Tracks largest message size we have observed either during recovery or during write
158     private int largestObservedSize;
159
160     SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
161             final int maxEntrySize, final int maxSegmentSize) {
162         this.persistenceId = requireNonNull(persistenceId);
163         this.directory = requireNonNull(directory);
164         this.storage = requireNonNull(storage);
165         this.maxEntrySize = maxEntrySize;
166         this.maxSegmentSize = maxSegmentSize;
167     }
168
169     static Props props(final String persistenceId, final File directory, final StorageLevel storage,
170             final int maxEntrySize, final int maxSegmentSize) {
171         return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
172             maxEntrySize, maxSegmentSize);
173     }
174
175     @Override
176     public Receive createReceive() {
177         return receiveBuilder()
178                 .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
179                 .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
180                 .match(ReplayMessages.class, this::handleReplayMessages)
181                 .match(WriteMessages.class, this::handleWriteMessages)
182                 .matchAny(this::handleUnknown)
183                 .build();
184     }
185
186     @Override
187     public void preStart() throws Exception {
188         LOG.debug("{}: actor starting", persistenceId);
189         super.preStart();
190     }
191
192     @Override
193     public void postStop() throws Exception {
194         LOG.debug("{}: actor stopping", persistenceId);
195         if (dataJournal != null) {
196             dataJournal.close();
197             LOG.debug("{}: data journal closed", persistenceId);
198             dataJournal = null;
199         }
200         if (deleteJournal != null) {
201             deleteJournal.close();
202             LOG.debug("{}: delete journal closed", persistenceId);
203             deleteJournal = null;
204         }
205         LOG.debug("{}: actor stopped", persistenceId);
206         super.postStop();
207     }
208
209     static AsyncMessage<Void> deleteMessagesTo(final long toSequenceNr) {
210         return new DeleteMessagesTo(toSequenceNr);
211     }
212
213     static AsyncMessage<Long> readHighestSequenceNr(final long fromSequenceNr) {
214         return new ReadHighestSequenceNr(fromSequenceNr);
215     }
216
217     static AsyncMessage<Void> replayMessages(final long fromSequenceNr, final long toSequenceNr, final long max,
218             final Consumer<PersistentRepr> replayCallback) {
219         return new ReplayMessages(fromSequenceNr, toSequenceNr, max, replayCallback);
220     }
221
222     private void handleDeleteMessagesTo(final DeleteMessagesTo message) {
223         ensureOpen();
224
225         LOG.debug("{}: delete messages {}", persistenceId, message);
226         final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
227         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
228
229         if (lastDelete < to) {
230             LOG.debug("{}: deleting entries up to {}", persistenceId, to);
231
232             lastDelete = to;
233             final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
234             final Indexed<Long> entry = deleteWriter.append(lastDelete);
235             deleteWriter.commit(entry.index());
236             dataJournal.writer().commit(lastDelete);
237
238             LOG.debug("{}: compaction started", persistenceId);
239             dataJournal.compact(lastDelete);
240             deleteJournal.compact(entry.index());
241             LOG.debug("{}: compaction finished", persistenceId);
242         } else {
243             LOG.debug("{}: entries up to {} already deleted", persistenceId, lastDelete);
244         }
245
246         message.promise.success(null);
247     }
248
249     @SuppressWarnings("checkstyle:illegalCatch")
250     private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
251         LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
252         final Long sequence;
253         if (directory.isDirectory()) {
254             ensureOpen();
255             sequence = dataJournal.writer().getLastIndex();
256         } else {
257             sequence = 0L;
258         }
259
260         LOG.debug("{}: highest sequence is {}", message, sequence);
261         message.promise.success(sequence);
262     }
263
264     @SuppressWarnings("checkstyle:illegalCatch")
265     private void handleReplayMessages(final ReplayMessages message) {
266         LOG.debug("{}: replaying messages {}", persistenceId, message);
267         ensureOpen();
268
269         final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
270         LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
271
272         try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
273             int count = 0;
274             while (reader.hasNext() && count < message.max) {
275                 final Indexed<DataJournalEntry> next = reader.next();
276                 if (next.index() > message.toSequenceNr) {
277                     break;
278                 }
279
280                 LOG.trace("{}: replay {}", persistenceId, next);
281                 updateLargestSize(next.size());
282                 final DataJournalEntry entry = next.entry();
283                 verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
284
285                 final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
286                 LOG.debug("{}: replaying {}", persistenceId, repr);
287                 message.replayCallback.accept(repr);
288                 count++;
289             }
290             LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
291         } catch (Exception e) {
292             LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
293             message.promise.failure(e);
294         } finally {
295             message.promise.success(null);
296         }
297     }
298
299     @SuppressWarnings("checkstyle:illegalCatch")
300     private void handleWriteMessages(final WriteMessages message) {
301         ensureOpen();
302
303         final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
304         final long startTicks = System.nanoTime();
305         final int count = message.requests.size();
306         final long start = writer.getLastIndex();
307
308         for (int i = 0; i < count; ++i) {
309             final long mark = writer.getLastIndex();
310             try {
311                 writeRequest(writer, message.requests.get(i));
312             } catch (Exception e) {
313                 LOG.warn("{}: failed to write out request", persistenceId, e);
314                 message.results.get(i).success(Optional.of(e));
315                 writer.truncate(mark);
316                 continue;
317             }
318
319             message.results.get(i).success(Optional.empty());
320         }
321         writer.flush();
322         batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
323         messageWriteCount.mark(writer.getLastIndex() - start);
324     }
325
326     private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
327         // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
328         final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
329         while (it.hasNext()) {
330             final PersistentRepr repr = it.next();
331             final Object payload = repr.payload();
332             if (!(payload instanceof Serializable)) {
333                 throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
334             }
335
336             final int size = writer.append(new ToPersistence(repr)).size();
337             messageSize.update(size);
338             updateLargestSize(size);
339         }
340     }
341
342     private void handleUnknown(final Object message) {
343         LOG.error("{}: Received unknown message {}", persistenceId, message);
344     }
345
346     private void updateLargestSize(final int size) {
347         if (size > largestObservedSize) {
348             largestObservedSize = size;
349         }
350     }
351
352     private void ensureOpen() {
353         if (dataJournal != null) {
354             verifyNotNull(deleteJournal);
355             return;
356         }
357
358         deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
359                 .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
360         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
361         lastDelete = lastEntry == null ? 0 : lastEntry.index();
362
363         dataJournal = SegmentedJournal.<DataJournalEntry>builder()
364                 .withStorageLevel(storage).withDirectory(directory).withName("data")
365                 .withNamespace(Namespace.builder()
366                     .register(new DataJournalEntrySerializer(context().system()),
367                         FromPersistence.class, ToPersistence.class)
368                     .build())
369                 .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
370                 .build();
371         final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
372         writer.commit(lastDelete);
373         LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
374             lastDelete);
375     }
376 }