Bug 2264: Use streaming for snapshots
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 164c2cea561349cf178d63965eccd0a313e29b4e..c256c822a420e3a22b5a351778d58a88e73a9e8d 100644 (file)
@@ -199,7 +199,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.start();
 
         // Apply the snapshot to the actors state
-        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+        applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
@@ -317,7 +317,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     snapshot.getLastAppliedTerm()
                 );
             }
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
             replicatedLog = new ReplicatedLogImpl(snapshot);
@@ -354,17 +355,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.info("CaptureSnapshot received by actor");
-            CaptureSnapshot cs = (CaptureSnapshot)message;
-            captureSnapshot = cs;
-            createSnapshot();
 
-        } else if (message instanceof CaptureSnapshotReply){
-            LOG.info("CaptureSnapshotReply received by actor");
-            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+            if(captureSnapshot == null) {
+                captureSnapshot = (CaptureSnapshot)message;
+                createSnapshot();
+            }
 
-            ByteString stateInBytes = csr.getSnapshot();
-            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
-            handleCaptureSnapshotReply(stateInBytes);
+        } else if (message instanceof CaptureSnapshotReply){
+            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
@@ -583,7 +581,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -612,9 +610,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(ByteString snapshot);
+    protected abstract void applySnapshot(byte[] snapshotBytes);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -661,11 +659,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+        LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+        Snapshot sn = Snapshot.create(snapshotBytes,
             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
@@ -687,7 +687,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
             // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+                    ByteString.copyFrom(snapshotBytes)));
         }
 
         captureSnapshot = null;