import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev250130.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import scala.concurrent.duration.FiniteDuration;
noProgressTimeout = other.noProgressTimeout;
initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
useLz4Compression = other.useLz4Compression;
+ raftConfig.setPreferredFileFormat(useLz4Compression ? SnapshotFileFormat.LZ4 : SnapshotFileFormat.PLAIN);
exportOnRecovery = other.exportOnRecovery;
recoveryExportBaseDir = other.recoveryExportBaseDir;
public Builder useLz4Compression(final boolean value) {
datastoreContext.useLz4Compression = value;
+ datastoreContext.raftConfig.setPreferredFileFormat(
+ value ? SnapshotFileFormat.LZ4 : SnapshotFileFormat.PLAIN);
return this;
}
}
public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
- datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+ datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * 1_048_576);
return this;
}
import static java.util.Objects.requireNonNull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Path;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
public FilePlainSnapshotSource sourceForFile(final Path path) {
return new FilePlainSnapshotSource(path);
}
+
+ @Override
+ public InputStream decodeInput(final InputStream in) {
+ return in;
+ }
+
+ @Override
+ public OutputStream encodeOutput(final OutputStream out) {
+ return out;
+ }
},
/**
* A plain file.
public FileLz4SnapshotSource sourceForFile(final Path path) {
return new FileLz4SnapshotSource(path);
}
+
+ @Override
+ public InputStream decodeInput(final InputStream in) throws IOException {
+ return Lz4Support.newDecompressInputStream(in);
+ }
+
+ @Override
+ public OutputStream encodeOutput(final OutputStream out) throws IOException {
+ // Note: 256KiB seems to be sweet spot between memory usage and compression ratio:
+ // - is is guaranteed to not trigger G1GCs humongous objects (which can in as soon as 512KiB byte[])
+ // - it provides significant improvement over 64KiB
+ // - 1MiB does not provide an improvement justifying the 4x memory consumption increase
+ // - yes, we are sensitive to buffer sizes: imagine having a 100 shards performing compression at the same
+ // time :)
+ return Lz4Support.newCompressOutputStream(out, Lz4BlockSize.LZ4_256KB);
+ }
};
// Note: starts with ".", to make operations easier
*/
public abstract SnapshotSource sourceForFile(Path path);
+ /**
+ * Return an {@link InputStream} which produces plain snapshot bytes based on this format's stream obtained from
+ * specified input.
+ *
+ * @param in source input
+ * @return an {@link InputStream} to producing snapshot bytes
+ * @throws IOException when an I/O error occurs
+ */
+ public abstract InputStream decodeInput(InputStream in) throws IOException;
+
+ /**
+ * Return an {@link OutputStream} which receives plain snapshot bytes and produces this format's stream into
+ * specified output.
+ *
+ * @param out target output
+ * @return an {@link OutputStream} to receive snapshot bytes
+ * @throws IOException when an I/O error occurs
+ */
+ public abstract OutputStream encodeOutput(OutputStream out) throws IOException;
+
/**
* Returns the {@link SnapshotFileFormat} corresponding to specified file name by examining its extension.
*
import java.time.Duration;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
/**
* Configuration Parameter interface for configuring the Raft consensus system. Any component using this implementation
* @author Kamal Rameshan
*/
public interface ConfigParams {
- int MEGABYTE = 1048576;
-
/**
* Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
*
* @return the threshold in terms of number of journal entries.
*/
long getSyncIndexThreshold();
+
+ /**
+ * Retuns the preferred {@link SnapshotFileFormat}.
+ *
+ * @return the preferred {@link SnapshotFileFormat}
+ */
+ @NonNull SnapshotFileFormat getPreferredFileFormat();
}
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private String tempFileDirectory = "";
- private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+ private int fileBackedStreamingThreshold = 128 * 1_048_576;
private long syncIndexThreshold = 10;
+ private @NonNull SnapshotFileFormat preferredFileFormat = SnapshotFileFormat.PLAIN;
+
public void setHeartBeatInterval(final Duration heartBeatInterval) {
this.heartBeatInterval = requireNonNull(heartBeatInterval);
electionTimeOutInterval = null;
this.syncIndexThreshold = syncIndexThreshold;
}
+ @Override
+ public SnapshotFileFormat getPreferredFileFormat() {
+ return preferredFileFormat;
+ }
+
+ public void setPreferredFileFormat(final SnapshotFileFormat preferredFileFormat) {
+ this.preferredFileFormat = requireNonNull(preferredFileFormat);
+ }
+
private RaftPolicy getPolicy() {
if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.spi.EnabledRaftStorage;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
import org.opendaylight.raft.spi.SnapshotSource;
/**
final class PekkoRaftStorage extends EnabledRaftStorage {
private final RaftActor actor;
- PekkoRaftStorage(final RaftActor actor) {
+ PekkoRaftStorage(final RaftActor actor, final SnapshotFileFormat preferredFormat) {
+ super(preferredFormat);
this.actor = requireNonNull(actor);
}
import org.opendaylight.controller.cluster.raft.spi.DisabledRaftStorage;
import org.opendaylight.controller.cluster.raft.spi.EnabledRaftStorage;
import org.opendaylight.controller.cluster.raft.spi.ForwardingDataPersistenceProvider;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
/**
* Storage facade sitting between {@link RaftActor} and persistence provider, taking care of multiplexing between
delegate = disabledStorage;
}
- PersistenceControl(final RaftActor raftActor) {
- this(new DisabledRaftStorage(raftActor.memberId(), raftActor, raftActor.self()),
- new PekkoRaftStorage(raftActor));
+ PersistenceControl(final RaftActor raftActor, final SnapshotFileFormat preferredFormat) {
+ this(new DisabledRaftStorage(raftActor.memberId(), raftActor, raftActor.self(), preferredFormat),
+ new PekkoRaftStorage(raftActor, preferredFormat));
}
@Override
final Map<String, String> peerAddresses, final Optional<ConfigParams> configParams,
final short payloadVersion) {
super(memberId);
+
+ final var config = configParams.orElseGet(DefaultConfigParamsImpl::new);
localAccess = new LocalAccess(memberId, stateDir.resolve(memberId));
- persistenceControl = new PersistenceControl(this);
+ persistenceControl = new PersistenceControl(this, config.getPreferredFileFormat());
- context = new RaftActorContextImpl(self(), getContext(), localAccess, peerAddresses,
- configParams.orElseGet(DefaultConfigParamsImpl::new), payloadVersion, persistenceControl,
- this::handleApplyState, this::executeInSelf);
+ context = new RaftActorContextImpl(self(), getContext(), localAccess, peerAddresses, config, payloadVersion,
+ persistenceControl, this::handleApplyState, this::executeInSelf);
}
/**
}
final long absoluteThreshold = config.getSnapshotDataThreshold();
- final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+ final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * 1_048_576
: context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
return getDataSizeForSnapshotCheck() > dataThreshold;
}
final var config = context.getConfigParams();
final long absoluteThreshold = config.getSnapshotDataThreshold();
- final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+ final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * 1_048_576
: context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
final var replLog = context.getReplicatedLog();
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
import org.opendaylight.raft.spi.SnapshotSource;
/**
private final ExecuteInSelfActor actor;
private final ActorRef actorRef;
- public DisabledRaftStorage(final String memberId, final ExecuteInSelfActor actor, final ActorRef actorRef) {
+ public DisabledRaftStorage(final String memberId, final ExecuteInSelfActor actor, final ActorRef actorRef,
+ final SnapshotFileFormat preferredFormat) {
+ super(preferredFormat);
this.memberId = requireNonNull(memberId);
this.actor = requireNonNull(actor);
this.actorRef = requireNonNull(actorRef);
*/
package org.opendaylight.controller.cluster.raft.spi;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
/**
* A {@link RaftStorage} backing persistent mode of {@link RaftActor} operation.
*/
+@NonNullByDefault
public abstract non-sealed class EnabledRaftStorage extends RaftStorage {
+ protected EnabledRaftStorage(final SnapshotFileFormat preferredFormat) {
+ super(preferredFormat);
+ }
+
@Override
public final boolean isRecoveryApplicable() {
return true;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.raft.spi.SnapshotFileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
permits DisabledRaftStorage, EnabledRaftStorage {
private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
+ private final @NonNull SnapshotFileFormat preferredFormat;
+
+ protected RaftStorage(final SnapshotFileFormat preferredFormat) {
+ this.preferredFormat = requireNonNull(preferredFormat);
+ }
+
private ExecutorService executor;
// FIXME: we should have the concept of being 'open', when we have a thread pool to perform the asynchronous part
}
protected ToStringHelper addToStringAtrributes(final ToStringHelper helper) {
- return helper.add("memberId", memberId());
+ return helper.add("memberId", memberId()).add("preferredFormat", preferredFormat);
}
}