X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dummy-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fdummy%2Fdatastore%2FDummyShard.java;h=ad7836347d27578f6801dfec643834cab4a76f7a;hp=3dffdfce575d82a0118c11d137fe501a450defe7;hb=252ba03242407ee584c38fafdbfa1c322e66151d;hpb=97307fbbbee1f9bdb1409ac962386779dc4e93bf diff --git a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java index 3dffdfce57..ad7836347d 100644 --- a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java +++ b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java @@ -11,6 +11,9 @@ package org.opendaylight.controller.dummy.datastore; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -27,6 +30,7 @@ public class DummyShard extends UntypedActor{ private final Logger LOG = LoggerFactory.getLogger(DummyShard.class); private long lastMessageIndex = -1; private long lastMessageSize = 0; + private Stopwatch appendEntriesWatch; public DummyShard(Configuration configuration, String followerId) { this.configuration = configuration; @@ -39,10 +43,9 @@ public class DummyShard extends UntypedActor{ if(o instanceof RequestVote){ RequestVote req = (RequestVote) o; sender().tell(new RequestVoteReply(req.getTerm(), true), self()); - } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) { - AppendEntries req = AppendEntries.fromSerializable(o); - handleAppendEntries(req); - } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) { + } else if(o instanceof AppendEntries) { + handleAppendEntries((AppendEntries)o); + } else if(InstallSnapshot.isSerializedType(o)) { InstallSnapshot req = InstallSnapshot.fromSerializable(o); handleInstallSnapshot(req); } else if(o instanceof InstallSnapshot){ @@ -57,10 +60,21 @@ public class DummyShard extends UntypedActor{ } protected void handleAppendEntries(AppendEntries req) throws InterruptedException { - LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}", followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size()); + if(appendEntriesWatch != null){ + long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS); + if(elapsed >= 5){ + LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" + + ", leaderCommit = {}, prevLogIndex = {}, size = {}", + elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size()); + } + appendEntriesWatch.reset().start(); + } else { + appendEntriesWatch = Stopwatch.createStarted(); + } + if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){ LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex()); } @@ -87,10 +101,12 @@ public class DummyShard extends UntypedActor{ if (!ignore) { LOG.info("{} - Randomizing delay : {}", followerId, delay); Thread.sleep(delay); - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } } else { - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } }