Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-dummy-distributed-datastore / src / main / java / org / opendaylight / controller / dummy / datastore / DummyShard.java
index 3dffdfce575d82a0118c11d137fe501a450defe7..62eb392f2d58d3b55d04da2c00d1e44ebf55919a 100644 (file)
@@ -9,8 +9,10 @@
 package org.opendaylight.controller.dummy.datastore;
 
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
+import akka.actor.UntypedAbstractActor;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -21,12 +23,14 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DummyShard extends UntypedActor{
+public class DummyShard extends UntypedAbstractActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
+
     private final Configuration configuration;
     private final String followerId;
-    private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
     private long lastMessageIndex  = -1;
     private long lastMessageSize = 0;
+    private Stopwatch appendEntriesWatch;
 
     public DummyShard(Configuration configuration, String followerId) {
         this.configuration = configuration;
@@ -35,20 +39,16 @@ public class DummyShard extends UntypedActor{
     }
 
     @Override
-    public void onReceive(Object o) throws Exception {
-        if(o instanceof RequestVote){
-            RequestVote req = (RequestVote) o;
+    public void onReceive(Object message) throws Exception {
+        if (message instanceof RequestVote) {
+            RequestVote req = (RequestVote) message;
             sender().tell(new RequestVoteReply(req.getTerm(), true), self());
-        } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
-            AppendEntries req = AppendEntries.fromSerializable(o);
-            handleAppendEntries(req);
-        } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
-            InstallSnapshot req = InstallSnapshot.fromSerializable(o);
-            handleInstallSnapshot(req);
-        } else if(o instanceof InstallSnapshot){
-            handleInstallSnapshot((InstallSnapshot) o);
+        } else if (message instanceof AppendEntries) {
+            handleAppendEntries((AppendEntries) message);
+        } else if (message instanceof InstallSnapshot) {
+            handleInstallSnapshot((InstallSnapshot) message);
         } else {
-            LOG.error("Unknown message : {}", o.getClass());
+            LOG.error("Unknown message : {}", message.getClass());
         }
     }
 
@@ -57,12 +57,24 @@ public class DummyShard extends UntypedActor{
     }
 
     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
-
         LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
                 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
 
-        if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
-            LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
+        if (appendEntriesWatch != null) {
+            long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
+            if (elapsed >= 5) {
+                LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
+                                + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+                        elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+            }
+            appendEntriesWatch.reset().start();
+        } else {
+            appendEntriesWatch = Stopwatch.createStarted();
+        }
+
+        if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
+            LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
+                    req.getLeaderCommit(), req.getPrevLogIndex());
         }
 
         lastMessageIndex = req.getLeaderCommit();
@@ -70,7 +82,7 @@ public class DummyShard extends UntypedActor{
 
         long lastIndex = req.getLeaderCommit();
         if (req.getEntries().size() > 0) {
-            for(ReplicatedLogEntry entry : req.getEntries()) {
+            for (ReplicatedLogEntry entry : req.getEntries()) {
                 lastIndex = entry.getIndex();
             }
         }
@@ -87,33 +99,16 @@ public class DummyShard extends UntypedActor{
             if (!ignore) {
                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
                 Thread.sleep(delay);
-                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                        DataStoreVersions.CURRENT_VERSION), self());
             }
         } else {
-            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                    DataStoreVersions.CURRENT_VERSION), self());
         }
     }
 
     public static Props props(Configuration configuration, final String followerId) {
-
-        return Props.create(new DummyShardCreator(configuration, followerId));
+        return Props.create(DummyShard.class, configuration, followerId);
     }
-
-    private static class DummyShardCreator implements Creator<DummyShard> {
-
-        private static final long serialVersionUID = 1L;
-        private final Configuration configuration;
-        private final String followerId;
-
-        DummyShardCreator(Configuration configuration, String followerId) {
-            this.configuration = configuration;
-            this.followerId = followerId;
-        }
-
-        @Override
-        public DummyShard create() throws Exception {
-            return new DummyShard(configuration, followerId);
-        }
-    }
-
 }