Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
BUG-5626: refactor BehaviorStateHolder
[controller.git]
/
opendaylight
/
md-sal
/
sal-akka-raft
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
raft
/
behaviors
/
Follower.java
diff --git
a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
index 5952dc087f8889e8607edf75dc2be77ee98b4424..f483f0199e393e84b9d31bc598279f26f1d4a749 100644
(file)
--- a/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
+++ b/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
@@
-9,6
+9,7
@@
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@
-37,26
+38,33
@@
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
-
- private SnapshotTracker snapshotTracker = null;
+ private static final int SYNC_THRESHOLD = 10;
private final SyncStatusTracker initialSyncStatusTracker;
private final SyncStatusTracker initialSyncStatusTracker;
- private static final int SYNC_THRESHOLD = 10;
+ private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry logEntry) {
+ context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+ }
+ };
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
public Follower(RaftActorContext context) {
- this(context, null);
+ this(context, null
, (short)-1
);
}
}
- public Follower(RaftActorContext context, String initialLeaderId) {
+ public Follower(RaftActorContext context, String initialLeaderId
, short initialLeaderPayloadVersion
) {
super(context, RaftState.Follower);
leaderId = initialLeaderId;
super(context, RaftState.Follower);
leaderId = initialLeaderId;
+ setLeaderPayloadVersion(initialLeaderPayloadVersion);
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(canStartElection()) {
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(canStartElection()) {
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
- actor().tell(E
LECTION_TIMEOUT
, actor());
+ actor().tell(E
lectionTimeout.INSTANCE
, actor());
} else {
scheduleElection(electionDuration());
}
} else {
scheduleElection(electionDuration());
}
@@
-194,7
+202,7
@@
public class Follower extends AbstractRaftActorBehavior {
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
- context.getReplicatedLog().appendAndPersist(entry);
+ context.getReplicatedLog().appendAndPersist(entry
, appendAndPersistCallback
);
if(entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
if(entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
@@
-283,7
+291,8
@@
public class Follower extends AbstractRaftActorBehavior {
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
@@
-291,8
+300,9
@@
public class Follower extends AbstractRaftActorBehavior {
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
- && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
- !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+ && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
@@
-351,9
+361,7
@@
public class Follower extends AbstractRaftActorBehavior {
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
leaderId = installSnapshot.getLeaderId();
leaderId = installSnapshot.getLeaderId();
@@
-377,7
+385,7
@@
public class Follower extends AbstractRaftActorBehavior {
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor(),
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor(),
- context.getPeerServerInfo());
+ context.getPeerServerInfo(
true
));
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
@@
-419,7
+427,7
@@
public class Follower extends AbstractRaftActorBehavior {
}
@Override
}
@Override
- public void close()
throws Exception
{
+ public void close() {
stopElection();
}
stopElection();
}