X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=234f9db664e4d43e833e4e354e3d3045094dd381;hp=53e47c2f842f17ac0b2766811745a7f1ec6eabde;hb=bc7b4edec3c868a14e0a0de3a3b8e1af2406448b;hpb=11e9ade9af527aba7faeb633d3c9c7552fd09d2d diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 53e47c2f84..234f9db664 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -19,7 +19,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -112,8 +111,8 @@ public class Leader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); scheduleInstallSnapshotCheck( - new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, - HEART_BEAT_INTERVAL.unit()) + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, + context.getConfigParams().getHeartBeatInterval().unit()) ); } @@ -121,7 +120,7 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().info("Leader: Received {}", appendEntries.toString()); + context.getLogger().debug(appendEntries.toString()); return state(); } @@ -131,7 +130,7 @@ public class Leader extends AbstractRaftActorBehavior { if(! appendEntriesReply.isSuccess()) { context.getLogger() - .info("Leader: Received {}", appendEntriesReply.toString()); + .debug(appendEntriesReply.toString()); } // Update the FollowerLogInformation @@ -241,7 +240,7 @@ public class Leader extends AbstractRaftActorBehavior { (InstallSnapshotReply) message); } } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } return super.handleMessage(sender, message); @@ -264,26 +263,18 @@ public class Leader extends AbstractRaftActorBehavior { context.getLogger().debug("Replicate message " + logIndex); + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + if (followers.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); - - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); + context.setCommitIndex(logIndex); + applyLogToStateMachine(logIndex); } else { - - // Create a tracker entry we will use this later to notify the - // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - sendAppendEntries(); } } @@ -303,14 +294,9 @@ public class Leader extends AbstractRaftActorBehavior { List entries = Collections.emptyList(); if (context.getReplicatedLog().isPresent(nextIndex)) { - // TODO: Instead of sending all entries from nextIndex - // only send a fixed number of entries to each follower - // This is to avoid the situation where there are a lot of - // entries to install for a fresh follower or to a follower - // that has fallen too far behind with the log but yet is not - // eligible to receive a snapshot + // FIXME : Sending one entry at a time entries = - context.getReplicatedLog().getFrom(nextIndex); + context.getReplicatedLog().getFrom(nextIndex, 1); } followerActor.tell(