Allow segmented journal to flush periodically 80/110880/5
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 19 Mar 2024 20:42:14 +0000 (21:42 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 11:46:37 +0000 (12:46 +0100)
Flushes to disk end up dominating our use of disk resources, as we issue
a flush after each write. This is not entirely efficient, as we may have
multiple outstanding writes in the actor queue -- and we ignore the
batching opportunity.

This patch makes it possible to configure an upper bound of the number
of outstanding bytes written which can remainin unflushed.

We flush whenever we reach this watermark or when we flush all messages
that have been submitted at the time the flush batch has been started.

JIRA: CONTROLLER-2108
Change-Id: I6f18de7871c89b5feffecc71580e1f440024f2a3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/PerformanceTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/segmented.conf

index 9ba9394429ff82f831e1283c8020bf8c62c25dab..b89ebf4eb131b2c1b458781c98954f06bf900b09 100644 (file)
@@ -13,6 +13,7 @@ import com.codahale.metrics.Histogram;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WrittenMessages;
 
 /**
  * Abstraction of a data journal. This provides a unified interface towards {@link SegmentedJournalActor}, allowing
@@ -79,8 +80,13 @@ abstract class DataJournal {
     /**
      * Handle a request to store some messages.
      *
-     * @param message Request message
-     * @return number of bytes written
+     * @param message {@link WriteMessages} message
+     * @return a {@link WrittenMessages} object
+     */
+    abstract @NonNull WrittenMessages handleWriteMessages(@NonNull WriteMessages message);
+
+    /**
+     * Flush all messages to durable storage.
      */
-    abstract long handleWriteMessages(@NonNull WriteMessages message);
+    abstract void flush();
 }
index 24e8fec03aa6168e378a3b26ec136f452aea0dfa..243a064b80fea81b9fed2a522dacb9f0105aabc4 100644 (file)
@@ -18,11 +18,13 @@ import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WrittenMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.jdk.javaapi.CollectionConverters;
@@ -64,9 +66,15 @@ final class DataJournalV0 extends DataJournal {
 
     @Override
     void close() {
+        flush();
         entries.close();
     }
 
+    @Override
+    void flush() {
+        entries.writer().flush();
+    }
+
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
     void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) {
@@ -105,10 +113,11 @@ final class DataJournalV0 extends DataJournal {
 
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
-    long handleWriteMessages(final WriteMessages message) {
+    WrittenMessages handleWriteMessages(final WriteMessages message) {
         final int count = message.size();
+        final var responses = new ArrayList<>();
         final var writer = entries.writer();
-        long bytes = 0;
+        long writtenBytes = 0;
 
         for (int i = 0; i < count; ++i) {
             final long mark = writer.getLastIndex();
@@ -117,18 +126,17 @@ final class DataJournalV0 extends DataJournal {
             final var reprs = CollectionConverters.asJava(request.payload());
             LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
             try {
-                bytes += writePayload(writer, reprs);
+                writtenBytes += writePayload(writer, reprs);
             } catch (Exception e) {
                 LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
-                message.setFailure(i, e);
+                responses.add(e);
                 writer.truncate(mark);
                 continue;
             }
-
-            message.setSuccess(i);
+            responses.add(null);
         }
-        writer.flush();
-        return bytes;
+
+        return new WrittenMessages(message, responses, writtenBytes);
     }
 
     private long writePayload(final JournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
index 1324067bc8c71a47c222f53318a33c9e30282a86..41a6add0ed7d6c29fbe3c2658836c7dc2c3aad01 100644 (file)
@@ -44,6 +44,8 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
     public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
     public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+    public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
+    public static final int STORAGE_MAX_UNFLUSHED_BYTES_DEFAULT = 0;
     public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
@@ -53,6 +55,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     private final StorageLevel storage;
     private final int maxEntrySize;
     private final int maxSegmentSize;
+    private final int maxUnflushedBytes;
 
     public SegmentedFileJournal(final Config config) {
         rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
@@ -64,6 +67,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
         maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
         maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+        maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, STORAGE_MAX_UNFLUSHED_BYTES_DEFAULT);
 
         if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
             storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
@@ -117,7 +121,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
         LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
 
         final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
-            maxEntrySize, maxSegmentSize));
+            maxEntrySize, maxSegmentSize, maxUnflushedBytes));
         LOG.debug("Directory {} handled by {}", directory, handler);
         return handler;
     }
@@ -141,9 +145,8 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
         if (!config.hasPath(path)) {
             return defaultValue;
         }
-        final var value = config.getMemorySize(path);
-        final long result = value.toBytes();
-        checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
-        return (int) result;
+        final long value = config.getBytes(path);
+        checkArgument(value <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+        return (int) value;
     }
 }
index cefc4f12698e57cc110cf5c807b245bb7a546cc9..9f63892d26fdfd949b46d0bb9213bd84df2deb80 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
+import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
 import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import com.codahale.metrics.Histogram;
@@ -25,6 +28,7 @@ import io.atomix.storage.journal.JournalSerdes;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -57,8 +61,8 @@ import scala.concurrent.Promise;
  * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
  * mapping information. The only additional information we need to maintain is the last deleted sequence number.
  */
-final class SegmentedJournalActor extends AbstractActor {
-    abstract static class AsyncMessage<T> {
+abstract sealed class SegmentedJournalActor extends AbstractActor {
+    abstract static sealed class AsyncMessage<T> {
         final Promise<T> promise = Promise.apply();
     }
 
@@ -143,6 +147,134 @@ final class SegmentedJournalActor extends AbstractActor {
         }
     }
 
+    // responses == null on success, Exception on failure
+    record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
+        WrittenMessages {
+            verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
+            verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
+        }
+
+        private void complete() {
+            for (int i = 0, size = responses.size(); i < size; ++i) {
+                if (responses.get(i) instanceof Exception ex) {
+                    message.setFailure(i, ex);
+                } else {
+                    message.setSuccess(i);
+                }
+            }
+        }
+    }
+
+    /**
+     * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
+     * queue is empty.
+     *
+     * <p>
+     * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
+     * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
+     * workloads.
+     *
+     * <p>
+     * We solve this by having an additional queue in which we track which messages were written and trigger a flush
+     * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
+     * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
+     * it, we know we have processed all messages that were in the queue when we first delayed the write.
+     *
+     * <p>
+     * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
+     * batching opportunities.
+     */
+    private static final class Delayed extends SegmentedJournalActor {
+        private static final class Flush extends AsyncMessage<Void> {
+            final long batch;
+
+            Flush(final long batch) {
+                this.batch = batch;
+            }
+        }
+
+        private final ArrayDeque<WrittenMessages> unflushedWrites = new ArrayDeque<>();
+        private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
+        private final long maxUnflushedBytes;
+
+        private long batch = 0;
+        private long unflushedBytes = 0;
+
+        Delayed(final String persistenceId, final File directory, final StorageLevel storage,
+                final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+            super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+            this.maxUnflushedBytes = maxUnflushedBytes;
+        }
+
+        @Override
+        ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+            return super.addMessages(builder).match(Flush.class, this::handleFlush);
+        }
+
+        private void handleFlush(final Flush message) {
+            if (message.batch == batch) {
+                flushWrites();
+            } else {
+                LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
+            }
+        }
+
+        @Override
+        void onWrittenMessages(final WrittenMessages message) {
+            boolean first = unflushedWrites.isEmpty();
+            if (first) {
+                unflushedDuration.start();
+            }
+            unflushedWrites.addLast(message);
+            unflushedBytes = unflushedBytes + message.writtenBytes;
+            if (unflushedBytes >= maxUnflushedBytes) {
+                LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
+                flushWrites();
+            } else if (first) {
+                LOG.debug("{}: deferring journal flush", persistenceId());
+                self().tell(new Flush(++batch), ActorRef.noSender());
+            }
+        }
+
+        @Override
+        void flushWrites() {
+            final var unsyncedSize = unflushedWrites.size();
+            if (unsyncedSize == 0) {
+                // Nothing to flush
+                return;
+            }
+
+            LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
+                unflushedDuration.stop());
+            flushJournal(unflushedBytes, unsyncedSize);
+
+            final var sw = Stopwatch.createStarted();
+            unflushedWrites.forEach(WrittenMessages::complete);
+            unflushedWrites.clear();
+            unflushedBytes = 0;
+            unflushedDuration.reset();
+            LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
+        }
+    }
+
+    private static final class Immediate extends SegmentedJournalActor {
+        Immediate(final String persistenceId, final File directory, final StorageLevel storage,
+                final int maxEntrySize, final int maxSegmentSize) {
+            super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+        }
+
+        @Override
+        void onWrittenMessages(final WrittenMessages message) {
+            flushJournal(message.writtenBytes, 1);
+            message.complete();
+        }
+
+        @Override
+        void flushWrites() {
+            // No-op
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
     private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
         .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
@@ -161,12 +293,18 @@ final class SegmentedJournalActor extends AbstractActor {
     private Meter messageWriteCount;
     // Tracks the size distribution of messages
     private Histogram messageSize;
+    // Tracks the number of messages completed for each flush
+    private Histogram flushMessages;
+    // Tracks the number of bytes completed for each flush
+    private Histogram flushBytes;
+    // Tracks the duration of flush operations
+    private Timer flushTime;
 
     private DataJournal dataJournal;
     private SegmentedJournal<Long> deleteJournal;
     private long lastDelete;
 
-    SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+    private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
             final int maxEntrySize, final int maxSegmentSize) {
         this.persistenceId = requireNonNull(persistenceId);
         this.directory = requireNonNull(directory);
@@ -176,20 +314,39 @@ final class SegmentedJournalActor extends AbstractActor {
     }
 
     static Props props(final String persistenceId, final File directory, final StorageLevel storage,
-            final int maxEntrySize, final int maxSegmentSize) {
-        return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
-            maxEntrySize, maxSegmentSize);
+            final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+        final var pid = requireNonNull(persistenceId);
+        return maxUnflushedBytes > 0
+            ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes)
+            : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize);
+    }
+
+    final String persistenceId() {
+        return persistenceId;
+    }
+
+    final void flushJournal(final long bytes, final int messages) {
+        final var sw = Stopwatch.createStarted();
+        dataJournal.flush();
+        LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop());
+        flushBytes.update(bytes);
+        flushMessages.update(messages);
+        flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
     }
 
     @Override
     public Receive createReceive() {
-        return receiveBuilder()
-                .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
-                .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
-                .match(ReplayMessages.class, this::handleReplayMessages)
-                .match(WriteMessages.class, this::handleWriteMessages)
-                .matchAny(this::handleUnknown)
-                .build();
+        return addMessages(receiveBuilder())
+            .matchAny(this::handleUnknown)
+            .build();
+    }
+
+    ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+        return builder
+            .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+            .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+            .match(ReplayMessages.class, this::handleReplayMessages)
+            .match(WriteMessages.class, this::handleWriteMessages);
     }
 
     @Override
@@ -203,6 +360,9 @@ final class SegmentedJournalActor extends AbstractActor {
         batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
         messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
         messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
+        flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes"));
+        flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages"));
+        flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime"));
     }
 
     @Override
@@ -239,6 +399,8 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
+        flushWrites();
+
         final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
@@ -267,6 +429,7 @@ final class SegmentedJournalActor extends AbstractActor {
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
+            flushWrites();
             sequence = dataJournal.lastWrittenSequenceNr();
         } else {
             sequence = 0L;
@@ -279,6 +442,7 @@ final class SegmentedJournalActor extends AbstractActor {
     private void handleReplayMessages(final ReplayMessages message) {
         LOG.debug("{}: replaying messages {}", persistenceId, message);
         ensureOpen();
+        flushWrites();
 
         final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
         LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
@@ -291,16 +455,24 @@ final class SegmentedJournalActor extends AbstractActor {
 
         final var sw = Stopwatch.createStarted();
         final long start = dataJournal.lastWrittenSequenceNr();
-        final long bytes = dataJournal.handleWriteMessages(message);
+        final var writtenMessages = dataJournal.handleWriteMessages(message);
         sw.stop();
 
         batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
         messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
 
         // log message after statistics are updated
-        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, bytes, sw);
+        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw);
+        onWrittenMessages(writtenMessages);
     }
 
+    /**
+     * Handle a check of written messages.
+     *
+     * @param message Messages which were written
+     */
+    abstract void onWrittenMessages(WrittenMessages message);
+
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
     }
@@ -323,4 +495,7 @@ final class SegmentedJournalActor extends AbstractActor {
         LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
             dataJournal.lastWrittenSequenceNr(), lastDelete);
     }
+
+    abstract void flushWrites();
+
 }
index e72041231cde894adb5b635f4fa8ac0c8e34e726..636a5e1d3f7883a4cfa74947205d30751f487448 100644 (file)
@@ -103,7 +103,8 @@ class PerformanceTest {
         LOG.info("Test {} entrySize={} segmentSize={} payload={} count={}", storage, maxEntrySize, maxSegmentSize,
             payloadSize, requestCount);
 
-        actor = kit.childActorOf(SegmentedJournalActor.props("perf", DIRECTORY, storage, maxEntrySize, maxSegmentSize)
+        actor = kit.childActorOf(
+            SegmentedJournalActor.props("perf", DIRECTORY, storage, maxEntrySize, maxSegmentSize, maxEntrySize)
             .withDispatcher(CallingThreadDispatcher.Id()));
 
         final var random = ThreadLocalRandom.current();
index cf5891e256a1e052314add62e41ad798f0fb438a..4d3db7980e2116ef013f62b13ed120ddb43c33f9 100644 (file)
@@ -52,6 +52,7 @@ class SegmentedFileJournalTest {
     private static final File DIRECTORY = new File("target/sfj-test");
     private static final int SEGMENT_SIZE = 1024 * 1024;
     private static final int MESSAGE_SIZE = 512 * 1024;
+    private static final int FLUSH_SIZE = 16 * 1024;
 
     private static ActorSystem SYSTEM;
 
@@ -210,7 +211,7 @@ class SegmentedFileJournalTest {
 
     private ActorRef actor() {
         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
-            SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
+            SEGMENT_SIZE, FLUSH_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
     }
 
     private void deleteEntries(final long deleteTo) {
index 8fce93b097e1bd7effaacb95e351c87254dde1c8..e391a0dd5dcc5d2322f2176890cb2b1f9906bcb0 100644 (file)
@@ -8,6 +8,7 @@ akka {
                 root-directory = "target/segmented-journal"
                 max-entry-size = 8M
                 max-segment-size = 32M
+                max-unflushed-bytes = 256K
                 memory-mapped = false
             }
         }
index 8f9b5041eeb5c4f58368e2de58545f44064db2d0..00be0551f6909ce0cfc1441038dd0a967d9b6845 100644 (file)
@@ -163,6 +163,8 @@ odl-cluster-data {
           max-entry-size = 16M
           # Maximum size of a segment
           max-segment-size = 128M
+          # Maximum number of bytes that are written without synchronizing storage
+          max-unflushed-bytes = 1M
           # Map each segment into memory. Defaults to true, use false to keep a heap-based
           # buffer instead.
           memory-mapped = true
@@ -181,6 +183,8 @@ odl-cluster-data {
           max-entry-size = 512K
           # Maximum size of a segment
           max-segment-size = 1M
+          # Maximum number of bytes that are written without synchronizing storage
+          max-unflushed-bytes = 128K
           # Map each segment into memory. Note that while this can improve performance,
           # it will also place additional burden on system resources.
           memory-mapped = false
index eebc201ca11f73acf5b62d45483d62f7a8287b48..a21cc3e178ab6842d871a9d5917d6461fd2eab44 100644 (file)
@@ -28,6 +28,7 @@ Member1 {
           root-directory = "target/segmented-journal"
           max-entry-size = 8M
           max-segment-size = 32M
+          max-unflushed-bytes = 256K
           memory-mapped = false
         }
       }
@@ -73,4 +74,4 @@ Member1 {
       ]
     }
   }
-}
\ No newline at end of file
+}