package org.opendaylight.controller.dummy.datastore;
import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
+import akka.actor.UntypedAbstractActor;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DummyShard extends UntypedActor{
+public class DummyShard extends UntypedAbstractActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
+
private final Configuration configuration;
private final String followerId;
- private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
private long lastMessageIndex = -1;
private long lastMessageSize = 0;
+ private Stopwatch appendEntriesWatch;
public DummyShard(Configuration configuration, String followerId) {
this.configuration = configuration;
}
@Override
- public void onReceive(Object o) throws Exception {
- if(o instanceof RequestVote){
- RequestVote req = (RequestVote) o;
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof RequestVote) {
+ RequestVote req = (RequestVote) message;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
- } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
- AppendEntries req = AppendEntries.fromSerializable(o);
- handleAppendEntries(req);
- } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
- InstallSnapshot req = InstallSnapshot.fromSerializable(o);
- handleInstallSnapshot(req);
- } else if(o instanceof InstallSnapshot){
- handleInstallSnapshot((InstallSnapshot) o);
+ } else if (message instanceof AppendEntries) {
+ handleAppendEntries((AppendEntries) message);
+ } else if (message instanceof InstallSnapshot) {
+ handleInstallSnapshot((InstallSnapshot) message);
} else {
- LOG.error("Unknown message : {}", o.getClass());
+ LOG.error("Unknown message : {}", message.getClass());
}
}
}
protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
-
LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
- if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
- LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
+ if (appendEntriesWatch != null) {
+ long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
+ if (elapsed >= 5) {
+ LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
+ + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+ elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+ }
+ appendEntriesWatch.reset().start();
+ } else {
+ appendEntriesWatch = Stopwatch.createStarted();
+ }
+
+ if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
+ LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
+ req.getLeaderCommit(), req.getPrevLogIndex());
}
lastMessageIndex = req.getLeaderCommit();
long lastIndex = req.getLeaderCommit();
if (req.getEntries().size() > 0) {
- for(ReplicatedLogEntry entry : req.getEntries()) {
+ for (ReplicatedLogEntry entry : req.getEntries()) {
lastIndex = entry.getIndex();
}
}
if (!ignore) {
LOG.info("{} - Randomizing delay : {}", followerId, delay);
Thread.sleep(delay);
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
} else {
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
}
public static Props props(Configuration configuration, final String followerId) {
-
- return Props.create(new DummyShardCreator(configuration, followerId));
+ return Props.create(DummyShard.class, configuration, followerId);
}
-
- private static class DummyShardCreator implements Creator<DummyShard> {
-
- private static final long serialVersionUID = 1L;
- private final Configuration configuration;
- private final String followerId;
-
- DummyShardCreator(Configuration configuration, String followerId) {
- this.configuration = configuration;
- this.followerId = followerId;
- }
-
- @Override
- public DummyShard create() throws Exception {
- return new DummyShard(configuration, followerId);
- }
- }
-
}