Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardSnapshotCohort.java
index 93e1d873579f0c02446b666f897cfcae640db525..c7bc20f7546599876d4f99a9ca07fc174ed836ed 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,9 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -23,6 +24,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
@@ -33,46 +35,54 @@ import org.slf4j.Logger;
  *
  * @author Thomas Pantelis
  */
-class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
 
+    private final InputOutputStreamFactory streamFactory;
     private final ActorRef snapshotActor;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
-    private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
-            final ShardDataTree store, final Logger log, final String logId) {
-        this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
-        this.store = Preconditions.checkNotNull(store);
+    ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId,
+            final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) {
+        this.streamFactory = requireNonNull(streamFactory);
+        this.snapshotActor = requireNonNull(snapshotActor);
+        this.store = requireNonNull(store);
         this.log = log;
         this.logId = logId;
     }
 
     static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
-            final ShardDataTree store, final Logger log, final String logId) {
+            final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) {
         final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
             FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
         final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
 
+        final InputOutputStreamFactory streamFactory = context.isUseLz4Compression()
+                ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple();
         // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
         // requests.
-        final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
+        final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory),
+                snapshotActorName);
 
-        return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
+        return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId);
     }
 
     @Override
     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
         // Forward the request to the snapshot actor
-        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef);
+        final ShardDataTreeSnapshot snapshot = store.takeStateSnapshot();
+        log.debug("{}: requesting serialization of snapshot {}", logId, snapshot);
+
+        ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, installSnapshotStream, actorRef);
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void applySnapshot(final Snapshot.State snapshotState) {
         if (!(snapshotState instanceof ShardSnapshotState)) {
-            log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState);
+            log.debug("{}: applySnapshot ignoring snapshot: {}", logId, snapshotState);
         }
 
         final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
@@ -94,9 +104,9 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     }
 
     @Override
-    public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
-        try (final ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
-            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+    public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
+        try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) {
+            return ShardDataTreeSnapshot.deserialize(in);
         }
     }
 }