import akka.japi.Creator;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DummyShard extends UntypedActor{
+public class DummyShard extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
+
private final Configuration configuration;
private final String followerId;
- private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
private long lastMessageIndex = -1;
private long lastMessageSize = 0;
private Stopwatch appendEntriesWatch;
}
@Override
- public void onReceive(Object o) throws Exception {
- if(o instanceof RequestVote){
- RequestVote req = (RequestVote) o;
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof RequestVote) {
+ RequestVote req = (RequestVote) message;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
- } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
- AppendEntries req = AppendEntries.fromSerializable(o);
- handleAppendEntries(req);
- } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
- InstallSnapshot req = InstallSnapshot.fromSerializable(o);
- handleInstallSnapshot(req);
- } else if(o instanceof InstallSnapshot){
- handleInstallSnapshot((InstallSnapshot) o);
+ } else if (message instanceof AppendEntries) {
+ handleAppendEntries((AppendEntries) message);
+ } else if (message instanceof InstallSnapshot) {
+ handleInstallSnapshot((InstallSnapshot) message);
} else {
- LOG.error("Unknown message : {}", o.getClass());
+ LOG.error("Unknown message : {}", message.getClass());
}
}
LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
- if(appendEntriesWatch != null){
+ if (appendEntriesWatch != null) {
long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
- if(elapsed >= 5){
- LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
- ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+ if (elapsed >= 5) {
+ LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
+ + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
}
appendEntriesWatch.reset().start();
appendEntriesWatch = Stopwatch.createStarted();
}
- if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
- LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
+ if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
+ LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
+ req.getLeaderCommit(), req.getPrevLogIndex());
}
lastMessageIndex = req.getLeaderCommit();
long lastIndex = req.getLeaderCommit();
if (req.getEntries().size() > 0) {
- for(ReplicatedLogEntry entry : req.getEntries()) {
+ for (ReplicatedLogEntry entry : req.getEntries()) {
lastIndex = entry.getIndex();
}
}
if (!ignore) {
LOG.info("{} - Randomizing delay : {}", followerId, delay);
Thread.sleep(delay);
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
} else {
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
}
return new DummyShard(configuration, followerId);
}
}
-
}