Add a segmented DataJournal abstraction 15/92615/7
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 23 Sep 2020 10:24:52 +0000 (12:24 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 2 Oct 2020 14:05:36 +0000 (14:05 +0000)
We are going to change the way we organize journal entries within
a segmented file. Add DataJournal abstraction which exposes
a unified interface towards the SegmentedJournalActor.

Current implementation is split out from SegmentedJournalActor
as DataJournalV0.

JIRA: CONTROLLER-1954
Change-Id: I0d1de42b22e75610d0434548483091e3359123e4
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java
new file mode 100644 (file)
index 0000000..3f74690
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.codahale.metrics.Histogram;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+
+/**
+ * Abstraction of a data journal. This provides a unified interface towards {@link SegmentedJournalActor}, allowing
+ * specialization for various formats.
+ */
+abstract class DataJournal {
+    // Mirrors fields from associated actor
+    final @NonNull String persistenceId;
+    private final Histogram messageSize;
+
+    // Tracks largest message size we have observed either during recovery or during write
+    private int largestObservedSize;
+
+    DataJournal(final String persistenceId, final Histogram messageSize) {
+        this.persistenceId = requireNonNull(persistenceId);
+        this.messageSize = requireNonNull(messageSize);
+    }
+
+    final void recordMessageSize(final int size) {
+        messageSize.update(size);
+        updateLargestSize(size);
+    }
+
+    final void updateLargestSize(final int size) {
+        if (size > largestObservedSize) {
+            largestObservedSize = size;
+        }
+    }
+
+    abstract long lastWrittenIndex();
+
+    abstract void commitTo(long index);
+
+    abstract void compactTo(long index);
+
+    abstract void close();
+
+    abstract void handleReplayMessages(ReplayMessages message, long from);
+
+    abstract void handleWriteMessages(WriteMessages message);
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
new file mode 100644 (file)
index 0000000..766f2fa
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2019, 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+
+import akka.actor.ActorSystem;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import com.codahale.metrics.Histogram;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.SegmentedJournalReader;
+import io.atomix.storage.journal.SegmentedJournalWriter;
+import io.atomix.utils.serializer.Namespace;
+import java.io.File;
+import java.io.Serializable;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.jdk.javaapi.CollectionConverters;
+
+/**
+ * Version 0 data journal, where every journal entry maps to exactly one segmented file entry.
+ *
+ * @author Robert Varga
+ */
+final class DataJournalV0 extends DataJournal {
+    private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class);
+
+    private final SegmentedJournal<DataJournalEntry> entries;
+
+    DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
+            final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
+        super(persistenceId, messageSize);
+        entries = SegmentedJournal.<DataJournalEntry>builder()
+                .withStorageLevel(storage).withDirectory(directory).withName("data")
+                .withNamespace(Namespace.builder()
+                    .register(new DataJournalEntrySerializer(system), FromPersistence.class, ToPersistence.class)
+                    .build())
+                .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
+                .build();
+    }
+
+    @Override
+    long lastWrittenIndex() {
+        return entries.writer().getLastIndex();
+    }
+
+    @Override
+    void commitTo(final long index) {
+        entries.writer().commit(index);
+    }
+
+    @Override
+    void compactTo(final long index) {
+        entries.compact(index);
+    }
+
+    @Override
+    void close() {
+        entries.close();
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:illegalCatch")
+    void handleReplayMessages(final ReplayMessages message, final long from) {
+        try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(from)) {
+            int count = 0;
+            while (reader.hasNext() && count < message.max) {
+                final Indexed<DataJournalEntry> next = reader.next();
+                if (next.index() > message.toSequenceNr) {
+                    break;
+                }
+
+                LOG.trace("{}: replay {}", persistenceId, next);
+                updateLargestSize(next.size());
+                final DataJournalEntry entry = next.entry();
+                verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
+
+                final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
+                LOG.debug("{}: replaying {}", persistenceId, repr);
+                message.replayCallback.accept(repr);
+                count++;
+            }
+            LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+        } catch (Exception e) {
+            LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
+            message.promise.failure(e);
+        } finally {
+            message.promise.success(null);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:illegalCatch")
+    void handleWriteMessages(final WriteMessages message) {
+        final int count = message.size();
+        final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+
+        for (int i = 0; i < count; ++i) {
+            final long mark = writer.getLastIndex();
+            final AtomicWrite request = message.getRequest(i);
+            try {
+                for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
+                    final Object payload = repr.payload();
+                    if (!(payload instanceof Serializable)) {
+                        throw new UnsupportedOperationException("Non-serializable payload encountered "
+                                + payload.getClass());
+                    }
+
+                    recordMessageSize(writer.append(new ToPersistence(repr)).size());
+                }
+            } catch (Exception e) {
+                LOG.warn("{}: failed to write out request", persistenceId, e);
+                message.setFailure(i, e);
+                writer.truncate(mark);
+                continue;
+            }
+
+            message.setSuccess(i);
+        }
+        writer.flush();
+    }
+}
index 4e0a54c..41117c8 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
-import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
@@ -23,25 +22,20 @@ import com.google.common.base.MoreObjects;
 import io.atomix.storage.StorageLevel;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
 import io.atomix.storage.journal.SegmentedJournalWriter;
 import io.atomix.utils.serializer.Namespace;
 import java.io.File;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
-import scala.jdk.javaapi.CollectionConverters;
 
 /**
  * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
@@ -83,11 +77,11 @@ final class SegmentedJournalActor extends AbstractActor {
         }
     }
 
-    private static final class ReplayMessages extends AsyncMessage<Void> {
+    static final class ReplayMessages extends AsyncMessage<Void> {
         private final long fromSequenceNr;
-        private final long toSequenceNr;
-        private final long max;
-        private final Consumer<PersistentRepr> replayCallback;
+        final long toSequenceNr;
+        final long max;
+        final Consumer<PersistentRepr> replayCallback;
 
         ReplayMessages(final long fromSequenceNr,
                 final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
@@ -115,6 +109,23 @@ final class SegmentedJournalActor extends AbstractActor {
             return promise.future();
         }
 
+        int size() {
+            return requests.size();
+        }
+
+        AtomicWrite getRequest(final int index) {
+            return requests.get(index);
+        }
+
+        void setFailure(final int index, final Exception cause) {
+            results.get(index).success(Optional.of(cause));
+
+        }
+
+        void setSuccess(final int index) {
+            results.get(index).success(Optional.empty());
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this).add("requests", requests).toString();
@@ -151,13 +162,10 @@ final class SegmentedJournalActor extends AbstractActor {
     // Tracks the size distribution of messages
     private Histogram messageSize;
 
-    private SegmentedJournal<DataJournalEntry> dataJournal;
+    private DataJournal dataJournal;
     private SegmentedJournal<Long> deleteJournal;
     private long lastDelete;
 
-    // Tracks largest message size we have observed either during recovery or during write
-    private int largestObservedSize;
-
     SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
             final int maxEntrySize, final int maxSegmentSize) {
         this.persistenceId = requireNonNull(persistenceId);
@@ -231,7 +239,7 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
-        final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+        final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
         if (lastDelete < to) {
@@ -241,10 +249,10 @@ final class SegmentedJournalActor extends AbstractActor {
             final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
             final Indexed<Long> entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
-            dataJournal.writer().commit(lastDelete);
+            dataJournal.commitTo(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compact(lastDelete + 1);
+            dataJournal.compactTo(lastDelete + 1);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -259,7 +267,7 @@ final class SegmentedJournalActor extends AbstractActor {
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
-            sequence = dataJournal.writer().getLastIndex();
+            sequence = dataJournal.lastWrittenIndex();
         } else {
             sequence = 0L;
         }
@@ -276,83 +284,25 @@ final class SegmentedJournalActor extends AbstractActor {
         final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
         LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
 
-        try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
-            int count = 0;
-            while (reader.hasNext() && count < message.max) {
-                final Indexed<DataJournalEntry> next = reader.next();
-                if (next.index() > message.toSequenceNr) {
-                    break;
-                }
-
-                LOG.trace("{}: replay {}", persistenceId, next);
-                updateLargestSize(next.size());
-                final DataJournalEntry entry = next.entry();
-                verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
-                final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
-                LOG.debug("{}: replaying {}", persistenceId, repr);
-                message.replayCallback.accept(repr);
-                count++;
-            }
-            LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
-        } catch (Exception e) {
-            LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
-            message.promise.failure(e);
-        } finally {
-            message.promise.success(null);
-        }
+        dataJournal.handleReplayMessages(message, from);
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
         final long startTicks = System.nanoTime();
-        final int count = message.requests.size();
-        final long start = writer.getLastIndex();
-
-        for (int i = 0; i < count; ++i) {
-            final long mark = writer.getLastIndex();
-            try {
-                writeRequest(writer, message.requests.get(i));
-            } catch (Exception e) {
-                LOG.warn("{}: failed to write out request", persistenceId, e);
-                message.results.get(i).success(Optional.of(e));
-                writer.truncate(mark);
-                continue;
-            }
-
-            message.results.get(i).success(Optional.empty());
-        }
-        writer.flush();
-        batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(writer.getLastIndex() - start);
-    }
+        final long start = dataJournal.lastWrittenIndex();
 
-    private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
-        for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
-            final Object payload = repr.payload();
-            if (!(payload instanceof Serializable)) {
-                throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
-            }
+        dataJournal.handleWriteMessages(message);
 
-            final int size = writer.append(new ToPersistence(repr)).size();
-            messageSize.update(size);
-            updateLargestSize(size);
-        }
+        batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
     }
 
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
     }
 
-    private void updateLargestSize(final int size) {
-        if (size > largestObservedSize) {
-            largestObservedSize = size;
-        }
-    }
-
     private void ensureOpen() {
         if (dataJournal != null) {
             verifyNotNull(deleteJournal);
@@ -364,17 +314,10 @@ final class SegmentedJournalActor extends AbstractActor {
         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
         lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
-        dataJournal = SegmentedJournal.<DataJournalEntry>builder()
-                .withStorageLevel(storage).withDirectory(directory).withName("data")
-                .withNamespace(Namespace.builder()
-                    .register(new DataJournalEntrySerializer(context().system()),
-                        FromPersistence.class, ToPersistence.class)
-                    .build())
-                .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
-                .build();
-        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
-        writer.commit(lastDelete);
-        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+        dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
+            maxEntrySize, maxSegmentSize);
+        dataJournal.commitTo(lastDelete);
+        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
             lastDelete);
     }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.