Propagate SnapshotFileFormat to RaftStore 68/116068/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Mar 2025 11:28:52 +0000 (12:28 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Mar 2025 14:41:17 +0000 (15:41 +0100)
Each RaftStore needs to have a preferred file format. Hook it to
use-lz4-compression, hardcoding to 256KiB block size, just as we do when
we transfer to followers.

JIRA: CONTROLLER-1423
Change-Id: I7a59f386abc250fe7f813175650ad9374f4711f4
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
raft/raft-spi/src/main/java/org/opendaylight/raft/spi/SnapshotFileFormat.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PekkoRaftStorage.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PersistenceControl.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/DisabledRaftStorage.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/EnabledRaftStorage.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/RaftStorage.java

index 840f215eafc8d9186818b7402f6c1a61f75500a8..197ae77c787b47bfb506904cd5be6aa2a1781219 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev250130.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import scala.concurrent.duration.FiniteDuration;
@@ -137,6 +138,7 @@ public class DatastoreContext implements ClientActorConfig {
         noProgressTimeout = other.noProgressTimeout;
         initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
         useLz4Compression = other.useLz4Compression;
+        raftConfig.setPreferredFileFormat(useLz4Compression ? SnapshotFileFormat.LZ4 : SnapshotFileFormat.PLAIN);
         exportOnRecovery = other.exportOnRecovery;
         recoveryExportBaseDir = other.recoveryExportBaseDir;
 
@@ -563,6 +565,8 @@ public class DatastoreContext implements ClientActorConfig {
 
         public Builder useLz4Compression(final boolean value) {
             datastoreContext.useLz4Compression = value;
+            datastoreContext.raftConfig.setPreferredFileFormat(
+                value ? SnapshotFileFormat.LZ4 : SnapshotFileFormat.PLAIN);
             return this;
         }
 
@@ -606,7 +610,7 @@ public class DatastoreContext implements ClientActorConfig {
         }
 
         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
-            datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+            datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * 1_048_576);
             return this;
         }
 
index fad949bcaeb418cdea5933a3a45192af59d6e686..5c3ec16ff05bbbac5000c5fa8381dcd7612a67e0 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.raft.spi;
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Path;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -26,6 +29,16 @@ public enum SnapshotFileFormat {
         public FilePlainSnapshotSource sourceForFile(final Path path) {
             return new FilePlainSnapshotSource(path);
         }
+
+        @Override
+        public InputStream decodeInput(final InputStream in) {
+            return in;
+        }
+
+        @Override
+        public OutputStream encodeOutput(final OutputStream out) {
+            return out;
+        }
     },
     /**
      * A plain file.
@@ -35,6 +48,22 @@ public enum SnapshotFileFormat {
         public FileLz4SnapshotSource sourceForFile(final Path path) {
             return new FileLz4SnapshotSource(path);
         }
+
+        @Override
+        public InputStream decodeInput(final InputStream in) throws IOException {
+            return Lz4Support.newDecompressInputStream(in);
+        }
+
+        @Override
+        public OutputStream encodeOutput(final OutputStream out) throws IOException {
+            // Note: 256KiB seems to be sweet spot between memory usage and compression ratio:
+            // - is is guaranteed to not trigger G1GCs humongous objects (which can in as soon as 512KiB byte[])
+            // - it provides significant improvement over 64KiB
+            // - 1MiB does not provide an improvement justifying the 4x memory consumption increase
+            // - yes, we are sensitive to buffer sizes: imagine having a 100 shards performing compression at the same
+            //   time :)
+            return Lz4Support.newCompressOutputStream(out, Lz4BlockSize.LZ4_256KB);
+        }
     };
 
     // Note: starts with ".", to make operations easier
@@ -61,6 +90,26 @@ public enum SnapshotFileFormat {
      */
     public abstract SnapshotSource sourceForFile(Path path);
 
+    /**
+     * Return an {@link InputStream} which produces plain snapshot bytes based on this format's stream obtained from
+     * specified input.
+     *
+     * @param in source input
+     * @return an {@link InputStream} to producing snapshot bytes
+     * @throws IOException when an I/O error occurs
+     */
+    public abstract InputStream decodeInput(InputStream in) throws IOException;
+
+    /**
+     * Return an {@link OutputStream} which receives plain snapshot bytes and produces this format's stream into
+     * specified output.
+     *
+     * @param out target output
+     * @return an {@link OutputStream} to receive snapshot bytes
+     * @throws IOException when an I/O error occurs
+     */
+    public abstract OutputStream encodeOutput(OutputStream out) throws IOException;
+
     /**
      * Returns the {@link SnapshotFileFormat} corresponding to specified file name by examining its extension.
      *
index 3f7ee7ffc78803fd2ad6caa36fb5983207a7ebfe..848891795a03ebc68c370cdc63c5aa0a509cd1e2 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import java.time.Duration;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 
 /**
  * Configuration Parameter interface for configuring the Raft consensus system. Any component using this implementation
@@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
  * @author Kamal Rameshan
  */
 public interface ConfigParams {
-    int MEGABYTE = 1048576;
-
     /**
      * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
      *
@@ -154,4 +153,11 @@ public interface ConfigParams {
      * @return the threshold in terms of number of journal entries.
      */
     long getSyncIndexThreshold();
+
+    /**
+     * Retuns the preferred {@link SnapshotFileFormat}.
+     *
+     * @return the preferred {@link SnapshotFileFormat}
+     */
+    @NonNull SnapshotFileFormat getPreferredFileFormat();
 }
index 1ee2515f163ca87a08f6aa4f1b3a014281db0ae5..d90342352b7f56fce590effb5de16adc464852d6 100644 (file)
@@ -17,6 +17,7 @@ import java.util.function.Supplier;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,10 +78,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     private String tempFileDirectory = "";
 
-    private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+    private int fileBackedStreamingThreshold = 128 * 1_048_576;
 
     private long syncIndexThreshold = 10;
 
+    private @NonNull SnapshotFileFormat preferredFileFormat = SnapshotFileFormat.PLAIN;
+
     public void setHeartBeatInterval(final Duration heartBeatInterval) {
         this.heartBeatInterval = requireNonNull(heartBeatInterval);
         electionTimeOutInterval = null;
@@ -239,6 +242,15 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.syncIndexThreshold = syncIndexThreshold;
     }
 
+    @Override
+    public SnapshotFileFormat getPreferredFileFormat() {
+        return preferredFileFormat;
+    }
+
+    public void setPreferredFileFormat(final SnapshotFileFormat preferredFileFormat) {
+        this.preferredFileFormat = requireNonNull(preferredFileFormat);
+    }
+
     private RaftPolicy getPolicy() {
         if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
             LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
index b59825adc3d5857dc56bbf641dbf3b3b61def9dc..ec9f865de42b0674d4f4328a3527c02c657b52f0 100644 (file)
@@ -17,6 +17,7 @@ import org.apache.pekko.persistence.SnapshotProtocol;
 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.spi.EnabledRaftStorage;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 import org.opendaylight.raft.spi.SnapshotSource;
 
 /**
@@ -26,7 +27,8 @@ import org.opendaylight.raft.spi.SnapshotSource;
 final class PekkoRaftStorage extends EnabledRaftStorage {
     private final RaftActor actor;
 
-    PekkoRaftStorage(final RaftActor actor) {
+    PekkoRaftStorage(final RaftActor actor, final SnapshotFileFormat preferredFormat) {
+        super(preferredFormat);
         this.actor = requireNonNull(actor);
     }
 
index 43bc76b8a4969cca39715927a42244e7e66a06c7..8906ff352bd5ed40fbc8046eb0879a95ae6d3812 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.spi.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.spi.DisabledRaftStorage;
 import org.opendaylight.controller.cluster.raft.spi.EnabledRaftStorage;
 import org.opendaylight.controller.cluster.raft.spi.ForwardingDataPersistenceProvider;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 
 /**
  * Storage facade sitting between {@link RaftActor} and persistence provider, taking care of multiplexing between
@@ -46,9 +47,9 @@ final class PersistenceControl extends ForwardingDataPersistenceProvider {
         delegate = disabledStorage;
     }
 
-    PersistenceControl(final RaftActor raftActor) {
-        this(new DisabledRaftStorage(raftActor.memberId(), raftActor, raftActor.self()),
-            new PekkoRaftStorage(raftActor));
+    PersistenceControl(final RaftActor raftActor, final SnapshotFileFormat preferredFormat) {
+        this(new DisabledRaftStorage(raftActor.memberId(), raftActor, raftActor.self(), preferredFormat),
+            new PekkoRaftStorage(raftActor, preferredFormat));
     }
 
     @Override
index d79cb208f0c73dc94f2f2fb6b2795b5269ad3d78..3483d077fcd710a4fd13d6ca13f7b664f68b869a 100644 (file)
@@ -125,12 +125,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             final Map<String, String> peerAddresses, final Optional<ConfigParams> configParams,
             final short payloadVersion) {
         super(memberId);
+
+        final var config = configParams.orElseGet(DefaultConfigParamsImpl::new);
         localAccess = new LocalAccess(memberId, stateDir.resolve(memberId));
-        persistenceControl = new PersistenceControl(this);
+        persistenceControl = new PersistenceControl(this, config.getPreferredFileFormat());
 
-        context = new RaftActorContextImpl(self(), getContext(), localAccess, peerAddresses,
-            configParams.orElseGet(DefaultConfigParamsImpl::new), payloadVersion, persistenceControl,
-            this::handleApplyState, this::executeInSelf);
+        context = new RaftActorContextImpl(self(), getContext(), localAccess, peerAddresses, config, payloadVersion,
+            persistenceControl, this::handleApplyState, this::executeInSelf);
     }
 
     /**
index 55cbc915158cca9dc56190a46ec78a47af3eaa15..22e6627d3d4f5f230886f2cd8e82d57f48572a26 100644 (file)
@@ -51,7 +51,7 @@ final class ReplicatedLogImpl extends AbstractReplicatedLog {
         }
 
         final long absoluteThreshold = config.getSnapshotDataThreshold();
-        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * 1_048_576
                 : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
         return getDataSizeForSnapshotCheck() > dataThreshold;
     }
index 38dd078d854ccf05a0b4d2cebc670290a5aba4a6..6b0ef3b16d1b9fa0a3d992b688fbb7067ae18302 100644 (file)
@@ -360,7 +360,7 @@ public final class SnapshotManager {
 
         final var config = context.getConfigParams();
         final long absoluteThreshold = config.getSnapshotDataThreshold();
-        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * 1_048_576
                 : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
 
         final var replLog = context.getReplicatedLog();
index fb0c632b20a1131515ff6df9119bedb686e297a8..8c5070010367c9df4df640f5b7d7353a93f0167c 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 import org.opendaylight.raft.spi.SnapshotSource;
 
 /**
@@ -44,7 +45,9 @@ public final class DisabledRaftStorage extends RaftStorage implements ImmediateD
     private final ExecuteInSelfActor actor;
     private final ActorRef actorRef;
 
-    public DisabledRaftStorage(final String memberId, final ExecuteInSelfActor actor, final ActorRef actorRef) {
+    public DisabledRaftStorage(final String memberId, final ExecuteInSelfActor actor, final ActorRef actorRef,
+            final SnapshotFileFormat preferredFormat) {
+        super(preferredFormat);
         this.memberId = requireNonNull(memberId);
         this.actor = requireNonNull(actor);
         this.actorRef = requireNonNull(actorRef);
index 342aa45f9af40602d20b7e6be8754483772e22d2..ee9e80c3668efe920f384baa2be782cf11250cef 100644 (file)
@@ -7,12 +7,19 @@
  */
 package org.opendaylight.controller.cluster.raft.spi;
 
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 
 /**
  * A {@link RaftStorage} backing persistent mode of {@link RaftActor} operation.
  */
+@NonNullByDefault
 public abstract non-sealed class EnabledRaftStorage extends RaftStorage {
+    protected EnabledRaftStorage(final SnapshotFileFormat preferredFormat) {
+        super(preferredFormat);
+    }
+
     @Override
     public final boolean isRecoveryApplicable() {
         return true;
index d733d551bd179c16ee7ac2e9a7757ef73939ad5a..96d6ea5e98e5a2d3e4eaaeb6a0720e94f44eb88b 100644 (file)
@@ -18,6 +18,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,12 @@ public abstract sealed class RaftStorage implements DataPersistenceProvider
         permits DisabledRaftStorage, EnabledRaftStorage {
     private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
 
+    private final @NonNull SnapshotFileFormat preferredFormat;
+
+    protected RaftStorage(final SnapshotFileFormat preferredFormat) {
+        this.preferredFormat = requireNonNull(preferredFormat);
+    }
+
     private ExecutorService executor;
 
     // FIXME: we should have the concept of being 'open', when we have a thread pool to perform the asynchronous part
@@ -111,6 +118,6 @@ public abstract sealed class RaftStorage implements DataPersistenceProvider
     }
 
     protected ToStringHelper addToStringAtrributes(final ToStringHelper helper) {
-        return helper.add("memberId", memberId());
+        return helper.add("memberId", memberId()).add("preferredFormat", preferredFormat);
     }
 }