Add a segmented DataJournal abstraction
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
index 1739b25cab023c19f72be61675813977faf6e90a..41117c8e32754f47a7b7a258cd803d31d83eedab 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
-import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
@@ -23,24 +22,18 @@ import com.google.common.base.MoreObjects;
 import io.atomix.storage.StorageLevel;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
 import io.atomix.storage.journal.SegmentedJournalWriter;
 import io.atomix.utils.serializer.Namespace;
 import java.io.File;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.Iterator;
-import scala.collection.SeqLike;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
@@ -84,11 +77,11 @@ final class SegmentedJournalActor extends AbstractActor {
         }
     }
 
-    private static final class ReplayMessages extends AsyncMessage<Void> {
+    static final class ReplayMessages extends AsyncMessage<Void> {
         private final long fromSequenceNr;
-        private final long toSequenceNr;
-        private final long max;
-        private final Consumer<PersistentRepr> replayCallback;
+        final long toSequenceNr;
+        final long max;
+        final Consumer<PersistentRepr> replayCallback;
 
         ReplayMessages(final long fromSequenceNr,
                 final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
@@ -116,6 +109,23 @@ final class SegmentedJournalActor extends AbstractActor {
             return promise.future();
         }
 
+        int size() {
+            return requests.size();
+        }
+
+        AtomicWrite getRequest(final int index) {
+            return requests.get(index);
+        }
+
+        void setFailure(final int index, final Exception cause) {
+            results.get(index).success(Optional.of(cause));
+
+        }
+
+        void setSuccess(final int index) {
+            results.get(index).success(Optional.empty());
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this).add("requests", requests).toString();
@@ -152,13 +162,10 @@ final class SegmentedJournalActor extends AbstractActor {
     // Tracks the size distribution of messages
     private Histogram messageSize;
 
-    private SegmentedJournal<DataJournalEntry> dataJournal;
+    private DataJournal dataJournal;
     private SegmentedJournal<Long> deleteJournal;
     private long lastDelete;
 
-    // Tracks largest message size we have observed either during recovery or during write
-    private int largestObservedSize;
-
     SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
             final int maxEntrySize, final int maxSegmentSize) {
         this.persistenceId = requireNonNull(persistenceId);
@@ -232,7 +239,7 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
-        final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+        final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
         if (lastDelete < to) {
@@ -242,10 +249,10 @@ final class SegmentedJournalActor extends AbstractActor {
             final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
             final Indexed<Long> entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
-            dataJournal.writer().commit(lastDelete);
+            dataJournal.commitTo(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compact(lastDelete + 1);
+            dataJournal.compactTo(lastDelete + 1);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -255,13 +262,12 @@ final class SegmentedJournalActor extends AbstractActor {
         message.promise.success(null);
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
         LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
-            sequence = dataJournal.writer().getLastIndex();
+            sequence = dataJournal.lastWrittenIndex();
         } else {
             sequence = 0L;
         }
@@ -278,86 +284,25 @@ final class SegmentedJournalActor extends AbstractActor {
         final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
         LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
 
-        try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
-            int count = 0;
-            while (reader.hasNext() && count < message.max) {
-                final Indexed<DataJournalEntry> next = reader.next();
-                if (next.index() > message.toSequenceNr) {
-                    break;
-                }
-
-                LOG.trace("{}: replay {}", persistenceId, next);
-                updateLargestSize(next.size());
-                final DataJournalEntry entry = next.entry();
-                verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
-                final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
-                LOG.debug("{}: replaying {}", persistenceId, repr);
-                message.replayCallback.accept(repr);
-                count++;
-            }
-            LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
-        } catch (Exception e) {
-            LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
-            message.promise.failure(e);
-        } finally {
-            message.promise.success(null);
-        }
+        dataJournal.handleReplayMessages(message, from);
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
         final long startTicks = System.nanoTime();
-        final int count = message.requests.size();
-        final long start = writer.getLastIndex();
-
-        for (int i = 0; i < count; ++i) {
-            final long mark = writer.getLastIndex();
-            try {
-                writeRequest(writer, message.requests.get(i));
-            } catch (Exception e) {
-                LOG.warn("{}: failed to write out request", persistenceId, e);
-                message.results.get(i).success(Optional.of(e));
-                writer.truncate(mark);
-                continue;
-            }
-
-            message.results.get(i).success(Optional.empty());
-        }
-        writer.flush();
-        batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(writer.getLastIndex() - start);
-    }
+        final long start = dataJournal.lastWrittenIndex();
 
-    private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
-        // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
-        final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
-        while (it.hasNext()) {
-            final PersistentRepr repr = it.next();
-            final Object payload = repr.payload();
-            if (!(payload instanceof Serializable)) {
-                throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
-            }
-
-            final int size = writer.append(new ToPersistence(repr)).size();
-            messageSize.update(size);
-            updateLargestSize(size);
-        }
+        dataJournal.handleWriteMessages(message);
+
+        batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
     }
 
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
     }
 
-    private void updateLargestSize(final int size) {
-        if (size > largestObservedSize) {
-            largestObservedSize = size;
-        }
-    }
-
     private void ensureOpen() {
         if (dataJournal != null) {
             verifyNotNull(deleteJournal);
@@ -369,17 +314,10 @@ final class SegmentedJournalActor extends AbstractActor {
         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
         lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
-        dataJournal = SegmentedJournal.<DataJournalEntry>builder()
-                .withStorageLevel(storage).withDirectory(directory).withName("data")
-                .withNamespace(Namespace.builder()
-                    .register(new DataJournalEntrySerializer(context().system()),
-                        FromPersistence.class, ToPersistence.class)
-                    .build())
-                .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
-                .build();
-        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
-        writer.commit(lastDelete);
-        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+        dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
+            maxEntrySize, maxSegmentSize);
+        dataJournal.commitTo(lastDelete);
+        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
             lastDelete);
     }
 }