import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
private long lastMessageIndex = -1;
private long lastMessageSize = 0;
+ private Stopwatch appendEntriesWatch;
public DummyShard(Configuration configuration, String followerId) {
this.configuration = configuration;
if(o instanceof RequestVote){
RequestVote req = (RequestVote) o;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
- } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
- AppendEntries req = AppendEntries.fromSerializable(o);
- handleAppendEntries(req);
- } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
- InstallSnapshot req = InstallSnapshot.fromSerializable(o);
- handleInstallSnapshot(req);
+ } else if(o instanceof AppendEntries) {
+ handleAppendEntries((AppendEntries)o);
} else if(o instanceof InstallSnapshot){
handleInstallSnapshot((InstallSnapshot) o);
} else {
}
protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
-
LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+ if(appendEntriesWatch != null){
+ long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
+ if(elapsed >= 5){
+ LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
+ ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+ elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+ }
+ appendEntriesWatch.reset().start();
+ } else {
+ appendEntriesWatch = Stopwatch.createStarted();
+ }
+
if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
}
if (!ignore) {
LOG.info("{} - Randomizing delay : {}", followerId, delay);
Thread.sleep(delay);
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
} else {
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
}