Merge "BUG-2637: migrate features"
authorAnil Vishnoi <vishnoianil@gmail.com>
Fri, 20 Feb 2015 20:01:18 +0000 (20:01 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 20 Feb 2015 20:01:19 +0000 (20:01 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/resources/simplelogger.properties

index 854ceb23d047fabea219f3861c5f44c0f2afc907..65254f2d6277c34c9773570e5938d8a005ab3015 100644 (file)
@@ -788,7 +788,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
+                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
index b2bb127eab525e35b5f7db5995e9bd57fcbe0746..66b8fba674161ddbc8f9f2e5a64154b62226c683 100644 (file)
@@ -171,6 +171,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return this;
         }
 
+        if(followerLogInformation.timeSinceLastActivity() >
+                context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+            LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " +
+                            "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+                    logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+                    context.getLastApplied(), context.getCommitIndex());
+        }
+
         followerLogInformation.markFollowerActive();
 
         if (appendEntriesReply.isSuccess()) {
index 9da6a3b5a41a025fd1a3ffcf30d42789701a6bc1..e72f4b2675eb886226ad660204fe706513937094 100644 (file)
@@ -13,17 +13,27 @@ odl-cluster-data {
     loggers = ["akka.event.slf4j.Slf4jLogger"]
 
     actor {
-
       provider = "akka.cluster.ClusterActorRefProvider"
       serializers {
-                java = "akka.serialization.JavaSerializer"
-                proto = "akka.remote.serialization.ProtobufSerializer"
-              }
+        java = "akka.serialization.JavaSerializer"
+        proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+        "com.google.protobuf.Message" = proto
+      }
 
-              serialization-bindings {
-                  "com.google.protobuf.Message" = proto
+      default-dispatcher {
+        # Setting throughput to 1 makes the dispatcher fair. It processes 1 message from
+        # the mailbox before moving on to the next mailbox
+        throughput = 1
+      }
 
-              }
+      default-mailbox {
+        # When not using a BalancingDispatcher it is recommended that we use the SingleConsumerOnlyUnboundedMailbox
+        # as it is the most efficient for multiple producer/single consumer use cases
+        mailbox-type="akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
+      }
     }
     remote {
       log-remote-lifecycle-events = off
index 4f472266c1f56acbd8fc531ae7189f1ae91951b4..c51ea80726e54d9cf656e193ca8521d242206c76 100644 (file)
@@ -342,7 +342,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             timerContext.stop();
 
             Snapshot timerSnapshot = commitTimer.getSnapshot();
-            double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+            double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
 
             long commitTimeoutInSeconds = actorContext.getDatastoreContext()
                     .getShardTransactionCommitTimeoutInSeconds();
index d2396e0524f340844ea5f7c0e35dfd6cad0b4806..b013515f2595950cba669ac08dfdb1a8eefe37fa 100644 (file)
@@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
         doReturn(commitTimerContext).when(commitTimer).time();
         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
-        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
         doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
index 3dffdfce575d82a0118c11d137fe501a450defe7..0b72a32f1033884ba983c71b2651a4a88ad6c7b1 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.dummy.datastore;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -27,6 +29,7 @@ public class DummyShard extends UntypedActor{
     private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
     private long lastMessageIndex  = -1;
     private long lastMessageSize = 0;
+    private Stopwatch appendEntriesWatch;
 
     public DummyShard(Configuration configuration, String followerId) {
         this.configuration = configuration;
@@ -57,10 +60,21 @@ public class DummyShard extends UntypedActor{
     }
 
     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
-
         LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
                 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
 
+        if(appendEntriesWatch != null){
+            long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
+            if(elapsed >= 5){
+                LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
+                                ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+                        elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+            }
+            appendEntriesWatch.reset().start();
+        } else {
+            appendEntriesWatch = Stopwatch.createStarted();
+        }
+
         if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
             LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
         }
index 067c048231d1da73c7c550fc4934b9523132c396..cd2d082079981b4f5323f823471fb3a7299c163c 100644 (file)
@@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.defaultLogLevel=info
\ No newline at end of file
+org.slf4j.simpleLogger.defaultLogLevel=error
\ No newline at end of file