Merge "BUG 1853 : Clustered Data Store causes Out of Memory"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 296ce2d24aaa24b3920d789697ddb784ccec7bea..190f1bd409e6c69ad8f8d8df30a6eaee7a04b3a7 100644 (file)
@@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     @Override public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.info("SnapshotOffer called..");
             SnapshotOffer offer = (SnapshotOffer) message;
             Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -135,16 +135,23 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
-            LOG.debug("Applied snapshot to replicatedLog. " +
-                "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+            LOG.info("Applied snapshot to replicatedLog. " +
+                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
                 replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size());
+                replicatedLog.size()
+            );
 
             // Apply the snapshot to the actors state
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
-            replicatedLog.append((ReplicatedLogEntry) message);
+            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+            // Apply State immediately
+            replicatedLog.append(logEntry);
+            applyState(null, "recovery", logEntry.getData());
+            context.setLastApplied(logEntry.getIndex());
+            context.setCommitIndex(logEntry.getIndex());
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
         } else if (message instanceof UpdateElectionTerm) {
@@ -152,7 +159,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "RecoveryCompleted - Switching actor to Follower - " +
-                    "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
                     "journal-size={}",
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
@@ -229,17 +237,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.removePeer(rrp.getName());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("CaptureSnapshot received by actor");
+            LOG.info("CaptureSnapshot received by actor");
             CaptureSnapshot cs = (CaptureSnapshot)message;
             captureSnapshot = cs;
             createSnapshot();
 
         } else if (message instanceof CaptureSnapshotReply){
-            LOG.debug("CaptureSnapshotReply received by actor");
+            LOG.info("CaptureSnapshotReply received by actor");
             CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
 
             ByteString stateInBytes = csr.getSnapshot();
-            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
             handleCaptureSnapshotReply(stateInBytes);
 
         } else {
@@ -255,6 +263,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if(oldBehavior != currentBehavior){
                 onStateChanged();
             }
+
+            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
         }
     }
 
@@ -314,6 +324,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected ActorSelection getLeader(){
         String leaderAddress = getLeaderAddress();
 
+        if(leaderAddress == null){
+            return null;
+        }
+
         return context.actorSelection(leaderAddress);
     }
 
@@ -415,6 +429,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
+    protected void onLeaderChanged(String oldLeader, String newLeader){};
+
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {