<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-codec-binfmt</artifactId>
</dependency>
+
+ <!-- Compression -->
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.7.1</version>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import com.google.common.io.ByteSource;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import org.eclipse.jdt.annotation.NonNull;
+
+@Beta
+public abstract class InputOutputStreamFactory {
+ InputOutputStreamFactory() {
+ // Hidden on purpose
+ }
+
+ public static @NonNull InputOutputStreamFactory simple() {
+ return PlainInputOutputStreamSupport.INSTANCE;
+ }
+
+ public static @NonNull InputOutputStreamFactory lz4(final String blockSize) {
+ return lz4(LZ4FrameOutputStream.BLOCKSIZE.valueOf("SIZE_" + blockSize));
+ }
+
+ public static @NonNull InputOutputStreamFactory lz4(final LZ4FrameOutputStream.BLOCKSIZE blockSize) {
+ return new LZ4InputOutputStreamSupport(requireNonNull(blockSize));
+ }
+
+ public abstract @NonNull InputStream createInputStream(ByteSource input) throws IOException;
+
+ public abstract @NonNull InputStream createInputStream(File file) throws IOException;
+
+ public abstract @NonNull OutputStream createOutputStream(File file) throws IOException;
+
+ public abstract @NonNull OutputStream wrapOutputStream(OutputStream output) throws IOException;
+
+ static @NonNull BufferedInputStream defaultCreateInputStream(final File file) throws FileNotFoundException {
+ return new BufferedInputStream(new FileInputStream(file));
+ }
+
+ static @NonNull BufferedOutputStream defaultCreateOutputStream(final File file) throws FileNotFoundException {
+ return new BufferedOutputStream(new FileOutputStream(file));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.io.ByteSource;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.FLG.Bits;
+import net.jpountz.xxhash.XXHashFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class LZ4InputOutputStreamSupport extends InputOutputStreamFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(LZ4InputOutputStreamSupport.class);
+ private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+ private static final XXHashFactory HASH_FACTORY = XXHashFactory.fastestInstance();
+
+ private final LZ4FrameOutputStream.BLOCKSIZE blocksize;
+
+ LZ4InputOutputStreamSupport(final LZ4FrameOutputStream.BLOCKSIZE blocksize) {
+ this.blocksize = blocksize;
+ }
+
+ @Override
+ public InputStream createInputStream(final ByteSource input) throws IOException {
+ final InputStream stream = input.openStream();
+ try {
+ return new LZ4FrameInputStream(stream, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32());
+ } catch (IOException e) {
+ stream.close();
+ LOG.warn("Error loading with lz4 decompression, using default one", e);
+ return input.openBufferedStream();
+ }
+ }
+
+ @Override
+ public InputStream createInputStream(final File file) throws IOException {
+ final FileInputStream fileInput = new FileInputStream(file);
+ try {
+ return new LZ4FrameInputStream(fileInput, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32());
+ } catch (IOException e) {
+ fileInput.close();
+ LOG.warn("Error loading file with lz4 decompression, using default one", e);
+ return defaultCreateInputStream(file);
+ }
+ }
+
+ @Override
+ public OutputStream createOutputStream(final File file) throws IOException {
+ return new LZ4FrameOutputStream(new FileOutputStream(file), blocksize, -1, LZ4_FACTORY.fastCompressor(),
+ HASH_FACTORY.hash32(), Bits.BLOCK_INDEPENDENCE);
+ }
+
+ @Override
+ public OutputStream wrapOutputStream(final OutputStream output) throws IOException {
+ return new LZ4FrameOutputStream(output, blocksize, -1, LZ4_FACTORY.fastCompressor(), HASH_FACTORY.hash32(),
+ Bits.BLOCK_INDEPENDENCE);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.io.ByteSource;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.eclipse.jdt.annotation.NonNull;
+
+final class PlainInputOutputStreamSupport extends InputOutputStreamFactory {
+ static final @NonNull PlainInputOutputStreamSupport INSTANCE = new PlainInputOutputStreamSupport();
+
+ private PlainInputOutputStreamSupport() {
+ // Hidden on purpose
+ }
+
+ @Override
+ public InputStream createInputStream(final ByteSource input) throws IOException {
+ return input.openBufferedStream();
+ }
+
+ @Override
+ public InputStream createInputStream(final File file) throws IOException {
+ return defaultCreateInputStream(file);
+ }
+
+ @Override
+ public OutputStream createOutputStream(final File file) throws IOException {
+ return defaultCreateOutputStream(file);
+ }
+
+ @Override
+ public OutputStream wrapOutputStream(final OutputStream output) throws IOException {
+ return output;
+ }
+}
import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
*/
public class LocalSnapshotStore extends SnapshotStore {
private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
-
private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
+ private final InputOutputStreamFactory streamFactory;
private final ExecutionContext executionContext;
private final int maxLoadAttempts;
private final File snapshotDir;
this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
snapshotDir = new File(config.getString("dir"));
- int localMaxLoadAttempts = config.getInt("max-load-attempts");
+ final int localMaxLoadAttempts = config.getInt("max-load-attempts");
maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
+ if (config.getBoolean("use-lz4-compression")) {
+ final String size = config.getString("lz4-blocksize");
+ streamFactory = InputOutputStreamFactory.lz4(size);
+ LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
+ } else {
+ streamFactory = InputOutputStreamFactory.simple();
+ LOG.debug("Using plain Input/Output Stream");
+ }
+
LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
}
private Object deserialize(final File file) throws IOException {
return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
(Callable<Object>) () -> {
- try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+ try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("Error loading snapshot file " + file, e);
LOG.debug("Saving to temp file: {}", temp);
- try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
+ try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
out.writeObject(snapshot);
} catch (IOException e) {
LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
snapshot-store.plugin = akka.persistence.snapshot-store.local
snapshot-store.local.dir = "target/snapshots"
+ snapshot-store.local.use-lz4-compression = false
}
}
# The relative path is always relative to KARAF_HOME.
# snapshot-store.local.dir = "target/snapshots"
+
+ # Use lz4 compression for LocalSnapshotStore snapshots
+ snapshot-store.local.use-lz4-compression = false
+ # Size of blocks for lz4 compression: 64KB, 256KB, 1MB or 4MB
+ snapshot-store.local.lz4-blocksize = 256KB
}
disable-default-actor-system-quarantined-event-handling = "false"
}
#recovery-snapshot-interval-seconds=0
# Option to take a snapshot when the entire DataTree root or top-level container is overwritten
-snapshot-on-root-overwrite=false
\ No newline at end of file
+snapshot-on-root-overwrite=false
+
+# Enable lz4 compression for snapshots sent from leader to followers
+#use-lz4-compression=true
private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+ private boolean useLz4Compression = false;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
this.requestTimeout = other.requestTimeout;
this.noProgressTimeout = other.noProgressTimeout;
this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
+ this.useLz4Compression = other.useLz4Compression;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return useTellBasedProtocol;
}
+ public boolean isUseLz4Compression() {
+ return useLz4Compression;
+ }
+
@Override
public int getMaximumMessageSliceSize() {
return maximumMessageSliceSize;
return this;
}
+ public Builder useLz4Compression(final boolean value) {
+ datastoreContext.useLz4Compression = value;
+ return this;
+ }
+
/**
* For unit tests only.
*/
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
- this.name);
+ this.name, datastoreContext);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+ private final InputOutputStreamFactory streamFactory;
private final ActorRef snapshotActor;
private final ShardDataTree store;
private final String logId;
private final Logger log;
- private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
- final ShardDataTree store, final Logger log, final String logId) {
+ ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId,
+ final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) {
+ this.streamFactory = requireNonNull(streamFactory);
this.snapshotActor = requireNonNull(snapshotActor);
this.store = requireNonNull(store);
this.log = log;
}
static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
- final ShardDataTree store, final Logger log, final String logId) {
+ final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) {
final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
+ final InputOutputStreamFactory streamFactory = context.isUseLz4Compression()
+ ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple();
// Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
// requests.
- final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
+ final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory),
+ snapshotActorName);
- return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
+ return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId);
}
@Override
@Override
public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
- try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
+ try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) {
return ShardDataTreeSnapshot.deserialize(in);
}
}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
/**
//actor name override used for metering. This does not change the "real" actor name
private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
- private ShardSnapshotActor() {
+ private final InputOutputStreamFactory streamFactory;
+
+ private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) {
super(ACTOR_NAME_FOR_METERING);
+ this.streamFactory = requireNonNull(streamFactory);
}
@Override
private void onSerializeSnapshot(final SerializeSnapshot request) {
Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
if (installSnapshotStream.isPresent()) {
- try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+ try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) {
request.getSnapshot().serialize(out);
} catch (IOException e) {
// TODO - we should communicate the failure in the CaptureSnapshotReply.
installSnapshotStream), ActorRef.noSender());
}
+ private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException {
+ return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream));
+ }
+
/**
* Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
*
snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
}
- public static Props props() {
- return Props.create(ShardSnapshotActor.class);
+ public static Props props(final InputOutputStreamFactory streamFactory) {
+ return Props.create(ShardSnapshotActor.class, streamFactory);
}
}
type non-zero-uint32-type;
description "The initial buffer capacity, in bytes, to use when serializing message payloads.";
}
+
+ leaf use-lz4-compression {
+ default false;
+ type boolean;
+ description "Use lz4 compression for snapshots, sent from leader to follower, for snapshots stored
+ by LocalSnapshotStore, use akka.conf configuration.";
+ }
}
container data-store-properties-container {
import akka.actor.ActorRef;
import akka.testkit.javadsl.TestKit;
-import java.io.ByteArrayInputStream;
+import com.google.common.io.ByteSource;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.time.Duration;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
public class ShardSnapshotActorTest extends AbstractActorTest {
+ private static final InputOutputStreamFactory STREAM_FACTORY = InputOutputStreamFactory.simple();
+
private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot,
final boolean withInstallSnapshot) throws Exception {
final TestKit kit = new TestKit(getSystem());
- final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName);
+ final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(STREAM_FACTORY), testName);
kit.watch(snapshotActor);
final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
if (installSnapshotStream != null) {
final ShardDataTreeSnapshot deserialized;
- try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
- installSnapshotStream.toByteArray()))) {
+ try (ObjectInputStream in = new ObjectInputStream(STREAM_FACTORY.createInputStream(
+ ByteSource.wrap(installSnapshotStream.toByteArray())))) {
deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}