Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
index dfbffb726ab4f7a079d4efc3c770f8ab3dbd5bb9..b307195a7ae038a976beae15997376a30a4f590f 100644 (file)
@@ -24,8 +24,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -82,20 +86,25 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private Optional<Cluster> cluster;
 
+    private final Consumer<ApplyState> applyStateConsumer;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
-            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
-            ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
+            @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
+            @Nonnull Map<String, String> peerAddresses,
+            @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
+            @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
-        this.termInformation = termInformation;
+        this.termInformation = Preconditions.checkNotNull(termInformation);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.configParams = configParams;
-        this.persistenceProvider = persistenceProvider;
-        this.log = logger;
+        this.configParams = Preconditions.checkNotNull(configParams);
+        this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
+        this.log = Preconditions.checkNotNull(logger);
+        this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
 
-        for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
+        for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
     }
@@ -386,6 +395,17 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.currentBehavior = Preconditions.checkNotNull(behavior);
     }
 
+    @Override
+    public Consumer<ApplyState> getApplyStateConsumer() {
+        return applyStateConsumer;
+    }
+
+    @Override
+    public FileBackedOutputStream newFileBackedOutputStream() {
+        return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
+                configParams.getTempFileDirectory());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void close() {
         if (currentBehavior != null) {