X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=b1c73f6f4155e045935fa37912245e3d03d45fc9;hb=3a71a222b896b9d07e638af62300180799bdac67;hp=1cfdf9dba8b912a5bc23f78b85c447621298d908;hpb=592ff3cd9e1b80d4cbd9ca45ed4a2371377a7622;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 1cfdf9dba8..b1c73f6f41 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,8 +9,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import java.util.ArrayList; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -24,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import java.util.ArrayList; - /** * The behavior of a RaftActor in the Follower state *
@@ -37,19 +36,16 @@ import java.util.ArrayList; * */ public class Follower extends AbstractRaftActorBehavior { - private ByteString snapshotChunksCollected = ByteString.EMPTY; - private final LoggingAdapter LOG; + private SnapshotTracker snapshotTracker = null; public Follower(RaftActorContext context) { super(context); - LOG = context.getLogger(); - scheduleElection(electionDuration()); } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { @@ -133,15 +129,14 @@ public class Follower extends AbstractRaftActorBehavior { new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { if(LOG.isDebugEnabled()) { LOG.debug( - "Number of entries to be appended = " + appendEntries - .getEntries().size() + "Number of entries to be appended = {}", appendEntries.getEntries().size() ); } @@ -168,8 +163,7 @@ public class Follower extends AbstractRaftActorBehavior { if(LOG.isDebugEnabled()) { LOG.debug( - "Removing entries from log starting at " - + matchEntry.getIndex() + "Removing entries from log starting at {}", matchEntry.getIndex() ); } @@ -181,9 +175,7 @@ public class Follower extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - context.getLogger().debug( - "After cleanup entries to be added from = " + (addEntriesFrom - + lastIndex()) + LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex()) ); } @@ -191,17 +183,14 @@ public class Follower extends AbstractRaftActorBehavior { for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { - context.getLogger().info( - "Append entry to log " + appendEntries.getEntries().get( - i).getData() - .toString() - ); - context.getReplicatedLog() - .appendAndPersist(appendEntries.getEntries().get(i)); + if(LOG.isDebugEnabled()) { + LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData()); + } + context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i)); } if(LOG.isDebugEnabled()) { - LOG.debug("Log size is now " + context.getReplicatedLog().size()); + LOG.debug("Log size is now {}", context.getReplicatedLog().size()); } } @@ -216,7 +205,7 @@ public class Follower extends AbstractRaftActorBehavior { if (prevCommitIndex != context.getCommitIndex()) { if(LOG.isDebugEnabled()) { - LOG.debug("Commit index set to " + context.getCommitIndex()); + LOG.debug("Commit index set to {}", context.getCommitIndex()); } } @@ -239,24 +228,24 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm()), actor()); - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - return state(); + return this; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Follower; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Object message = fromSerializableMessage(originalMessage); @@ -271,7 +260,7 @@ public class Follower extends AbstractRaftActorBehavior { } if (message instanceof ElectionTimeout) { - return RaftState.Candidate; + return switchBehavior(new Candidate(context)); } else if (message instanceof InstallSnapshot) { InstallSnapshot installSnapshot = (InstallSnapshot) message; @@ -292,46 +281,54 @@ public class Follower extends AbstractRaftActorBehavior { ); } - try { - if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { - // this is the last chunk, create a snapshot object and apply - - snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); - context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}", - snapshotChunksCollected.size()); + if(snapshotTracker == null){ + snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); + } - Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), - new ArrayList