X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=190f1bd409e6c69ad8f8d8df30a6eaee7a04b3a7;hb=d313c1a52799705222817dfcaeb2b6ab2a6f9146;hp=296ce2d24aaa24b3920d789697ddb784ccec7bea;hpb=15fa131be8b16703089a6d8508546120cf15d45d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 296ce2d24a..190f1bd409 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -135,16 +135,23 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { - replicatedLog.append((ReplicatedLogEntry) message); + ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; + + // Apply State immediately + replicatedLog.append(logEntry); + applyState(null, "recovery", logEntry.getData()); + context.setLastApplied(logEntry.getIndex()); + context.setCommitIndex(logEntry.getIndex()); } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); } else if (message instanceof UpdateElectionTerm) { @@ -152,7 +159,8 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof RecoveryCompleted) { LOG.debug( "RecoveryCompleted - Switching actor to Follower - " + - "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); @@ -229,17 +237,17 @@ public abstract class RaftActor extends UntypedPersistentActor { context.removePeer(rrp.getName()); } else if (message instanceof CaptureSnapshot) { - LOG.debug("CaptureSnapshot received by actor"); + LOG.info("CaptureSnapshot received by actor"); CaptureSnapshot cs = (CaptureSnapshot)message; captureSnapshot = cs; createSnapshot(); } else if (message instanceof CaptureSnapshotReply){ - LOG.debug("CaptureSnapshotReply received by actor"); + LOG.info("CaptureSnapshotReply received by actor"); CaptureSnapshotReply csr = (CaptureSnapshotReply) message; ByteString stateInBytes = csr.getSnapshot(); - LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); handleCaptureSnapshotReply(stateInBytes); } else { @@ -255,6 +263,8 @@ public abstract class RaftActor extends UntypedPersistentActor { if(oldBehavior != currentBehavior){ onStateChanged(); } + + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); } } @@ -314,6 +324,10 @@ public abstract class RaftActor extends UntypedPersistentActor { protected ActorSelection getLeader(){ String leaderAddress = getLeaderAddress(); + if(leaderAddress == null){ + return null; + } + return context.actorSelection(leaderAddress); } @@ -415,6 +429,8 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; + private RaftActorBehavior switchBehavior(RaftState state) { if (currentBehavior != null) { if (currentBehavior.state() == state) {