Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Deprecate InstallSnapshot protobuff messages
[controller.git]
/
opendaylight
/
md-sal
/
sal-akka-raft
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
raft
/
behaviors
/
Follower.java
diff --git
a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
index c8535614a9462bb5d4ee6cdfb6c3e3428c1cb854..bcc2480f459318f35ce3aa30e2d818404626d99b 100644
(file)
--- a/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
+++ b/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
@@
-14,6
+14,7
@@
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@
-44,12
+45,17
@@
public class Follower extends AbstractRaftActorBehavior {
private static final int SYNC_THRESHOLD = 10;
public Follower(RaftActorContext context) {
private static final int SYNC_THRESHOLD = 10;
public Follower(RaftActorContext context) {
+ this(context, null);
+ }
+
+ public Follower(RaftActorContext context, String initialLeaderId) {
super(context, RaftState.Follower);
super(context, RaftState.Follower);
+ leaderId = initialLeaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(c
ontext.getRaftPolicy().automaticElectionsEnabled
()) {
- if (context.getPeer
Addresses().isEmpty()
) {
+ if(c
anStartElection
()) {
+ if (context.getPeer
Ids().isEmpty() && getLeaderId() == null
) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
@@
-103,7
+109,7
@@
public class Follower extends AbstractRaftActorBehavior {
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
- if (snapshotTracker != null) {
+ if (snapshotTracker != null
|| context.getSnapshotManager().isApplying()
) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
@@
-189,6
+195,10
@@
public class Follower extends AbstractRaftActorBehavior {
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
context.getReplicatedLog().appendAndPersist(entry);
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
context.getReplicatedLog().appendAndPersist(entry);
+
+ if(entry.getData() instanceof ServerConfigurationPayload) {
+ context.updatePeerIds((ServerConfigurationPayload)entry.getData());
+ }
}
LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
}
LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
@@
-273,7
+283,8
@@
public class Follower extends AbstractRaftActorBehavior {
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
@@
-281,8
+292,9
@@
public class Follower extends AbstractRaftActorBehavior {
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
- && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
- !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+ && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
@@
-320,8
+332,12
@@
public class Follower extends AbstractRaftActorBehavior {
}
if (message instanceof ElectionTimeout) {
}
if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
+ if(canStartElection()) {
+ LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ return this;
+ }
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
@@
-335,11
+351,11
@@
public class Follower extends AbstractRaftActorBehavior {
return super.handleMessage(sender, message);
}
return super.handleMessage(sender, message);
}
- private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
+
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ leaderId = installSnapshot.getLeaderId();
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
@@
-348,6
+364,9
@@
public class Follower extends AbstractRaftActorBehavior {
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
try {
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
try {
+ final InstallSnapshotReply reply = new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
@@
-355,21
+374,33
@@
public class Follower extends AbstractRaftActorBehavior {
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
-
- actor().tell(new ApplySnapshot(snapshot), actor());
+ installSnapshot.getLastIncludedTerm(),
+ context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo(true));
- snapshotTracker = null;
+ ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+ @Override
+ public void onSuccess() {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
- }
+ sender.tell(reply, actor());
+ }
- InstallSnapshotReply reply = new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+ @Override
+ public void onFailure() {
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+ }
+ };
-
LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply
);
+
actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor()
);
- sender.tell(reply, actor());
+ snapshotTracker = null;
+ } else {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ sender.tell(reply, actor());
+ }
} catch (SnapshotTracker.InvalidChunkException e) {
LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
} catch (SnapshotTracker.InvalidChunkException e) {
LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);