import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DummyShard extends UntypedActor{
+public class DummyShard extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
+
private final Configuration configuration;
private final String followerId;
- private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
private long lastMessageIndex = -1;
private long lastMessageSize = 0;
private Stopwatch appendEntriesWatch;
}
@Override
- public void onReceive(Object o) throws Exception {
- if(o instanceof RequestVote){
- RequestVote req = (RequestVote) o;
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof RequestVote) {
+ RequestVote req = (RequestVote) message;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
- } else if(o instanceof AppendEntries) {
- handleAppendEntries((AppendEntries)o);
- } else if(o instanceof InstallSnapshot){
- handleInstallSnapshot((InstallSnapshot) o);
+ } else if (message instanceof AppendEntries) {
+ handleAppendEntries((AppendEntries) message);
+ } else if (message instanceof InstallSnapshot) {
+ handleInstallSnapshot((InstallSnapshot) message);
} else {
- LOG.error("Unknown message : {}", o.getClass());
+ LOG.error("Unknown message : {}", message.getClass());
}
}
LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
- if(appendEntriesWatch != null){
+ if (appendEntriesWatch != null) {
long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
- if(elapsed >= 5){
- LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
- ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+ if (elapsed >= 5) {
+ LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
+ + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
}
appendEntriesWatch.reset().start();
appendEntriesWatch = Stopwatch.createStarted();
}
- if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
- LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
+ if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
+ LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
+ req.getLeaderCommit(), req.getPrevLogIndex());
}
lastMessageIndex = req.getLeaderCommit();
long lastIndex = req.getLeaderCommit();
if (req.getEntries().size() > 0) {
- for(ReplicatedLogEntry entry : req.getEntries()) {
+ for (ReplicatedLogEntry entry : req.getEntries()) {
lastIndex = entry.getIndex();
}
}
return new DummyShard(configuration, followerId);
}
}
-
}