Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Bug 2669: Use slf4j Logger instead of akka LoggingAdapter"
[controller.git]
/
opendaylight
/
md-sal
/
sal-akka-raft
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
raft
/
behaviors
/
AbstractLeader.java
diff --git
a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
index 68cf5027dff1244d145e93dac28f2680ed9ee20a..31464c5aff818c7e44bcc2a42527db1af048c061 100644
(file)
--- a/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
+++ b/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
@@
-346,6
+346,7
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
@@
-373,6
+374,7
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
+ wasLastChunk = true;
} else {
followerToSnapshot.markSendStatus(true);
} else {
followerToSnapshot.markSendStatus(true);
@@
-383,6
+385,14
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
followerToSnapshot.markSendStatus(false);
}
followerToSnapshot.markSendStatus(false);
}
+
+ if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if(followerActor != null) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
context.getId(), reply.getChunkIndex(), followerId,
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
context.getId(), reply.getChunkIndex(), followerId,
@@
-546,7
+556,7
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
// no need to capture snapshot
sendSnapshotChunk(followerActor, e.getKey());
// no need to capture snapshot
sendSnapshotChunk(followerActor, e.getKey());
- } else {
+ } else
if (!context.isSnapshotCaptureInitiated())
{
initiateCaptureSnapshot();
//we just need 1 follower who would need snapshot to be installed.
// when we have the snapshot captured, we would again check (in SendInstallSnapshot)
initiateCaptureSnapshot();
//we just need 1 follower who would need snapshot to be installed.
// when we have the snapshot captured, we would again check (in SendInstallSnapshot)
@@
-579,6
+589,7
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
actor());
actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
actor());
+ context.setSnapshotCaptureInitiated(true);
}
}
@@
-615,8
+626,8
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
nextSnapshotChunk,
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
- followerToSnapshot.getTotalChunks(),
+
followerToSnapshot.incrementChunkIndex(),
+
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
@@
-627,7
+638,7
@@
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(
e, "{}: InstallSnapshot failed for Leader.", context.getId()
);
+ LOG.error(
"{}: InstallSnapshot failed for Leader.", context.getId(), e
);
}
}
}
}