X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=8270f2949a67cc9fc00f5180dce41872ca6a8a47;hp=296ce2d24aaa24b3920d789697ddb784ccec7bea;hb=4c975594eb74127ca4c0018984d569ae52be77e5;hpb=0875a6258aa459ccf558529bdf1524c7c8986c54;ds=sidebyside diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 296ce2d24a..8270f2949a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -96,7 +96,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private RaftActorContext context; + protected RaftActorContext context; /** * The in-memory journal @@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -134,25 +134,38 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { - replicatedLog.append((ReplicatedLogEntry) message); + ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; + + // Apply State immediately + replicatedLog.append(logEntry); + applyState(null, "recovery", logEntry.getData()); + context.setLastApplied(logEntry.getIndex()); + context.setCommitIndex(logEntry.getIndex()); + } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { - LOG.debug( + LOG.info( "RecoveryCompleted - Switching actor to Follower - " + - "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); @@ -165,9 +178,11 @@ public abstract class RaftActor extends UntypedPersistentActor { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); + if(LOG.isDebugEnabled()) { + LOG.debug("Applying state for log index {} data {}", + applyState.getReplicatedLogEntry().getIndex(), + applyState.getReplicatedLogEntry().getData()); + } applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); @@ -175,9 +190,12 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); - LOG.debug("ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm() + ); + } applySnapshot(ByteString.copyFrom(snapshot.getState())); //clears the followers log, sets the snapshot index to ensure adjusted-index works @@ -229,23 +247,25 @@ public abstract class RaftActor extends UntypedPersistentActor { context.removePeer(rrp.getName()); } else if (message instanceof CaptureSnapshot) { - LOG.debug("CaptureSnapshot received by actor"); + LOG.info("CaptureSnapshot received by actor"); CaptureSnapshot cs = (CaptureSnapshot)message; captureSnapshot = cs; createSnapshot(); } else if (message instanceof CaptureSnapshotReply){ - LOG.debug("CaptureSnapshotReply received by actor"); + LOG.info("CaptureSnapshotReply received by actor"); CaptureSnapshotReply csr = (CaptureSnapshotReply) message; ByteString stateInBytes = csr.getSnapshot(); - LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); handleCaptureSnapshotReply(stateInBytes); } else { if (!(message instanceof AppendEntriesMessages.AppendEntries) && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { - LOG.debug("onReceiveCommand: message:" + message.getClass()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } } RaftState state = @@ -255,6 +275,8 @@ public abstract class RaftActor extends UntypedPersistentActor { if(oldBehavior != currentBehavior){ onStateChanged(); } + + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); } } @@ -284,7 +306,9 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - LOG.debug("Persist data {}", replicatedLogEntry); + if(LOG.isDebugEnabled()) { + LOG.debug("Persist data {}", replicatedLogEntry); + } replicatedLog .appendAndPersist(clientActor, identifier, replicatedLogEntry); @@ -314,6 +338,10 @@ public abstract class RaftActor extends UntypedPersistentActor { protected ActorSelection getLeader(){ String leaderAddress = getLeaderAddress(); + if(leaderAddress == null){ + return null; + } + return context.actorSelection(leaderAddress); } @@ -415,6 +443,8 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; + private RaftActorBehavior switchBehavior(RaftState state) { if (currentBehavior != null) { if (currentBehavior.state() == state) { @@ -467,8 +497,10 @@ public abstract class RaftActor extends UntypedPersistentActor { return null; } String peerAddress = context.getPeerAddress(leaderId); - LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " - + peerAddress); + if(LOG.isDebugEnabled()) { + LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " + + peerAddress); + } return peerAddress; } @@ -568,10 +600,13 @@ public abstract class RaftActor extends UntypedPersistentActor { lastAppliedTerm = lastAppliedEntry.getTerm(); } - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", + context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + } // send a CaptureSnapshot to self to make the expensive operation async. getSelf().tell(new CaptureSnapshot( @@ -623,8 +658,9 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public void update(long currentTerm, String votedFor) { - LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); - + if(LOG.isDebugEnabled()) { + LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + } this.currentTerm = currentTerm; this.votedFor = votedFor; }