CDS: Include CAN_COMMIT phase in rate limiter time period 59/25259/1
authorTom Pantelis <tpanteli@brocade.com>
Thu, 2 Jul 2015 02:53:56 +0000 (22:53 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 13 Aug 2015 17:47:14 +0000 (17:47 +0000)
I was testing with simulated latency in the followers. With a high enough
latency and tx thru-put, the pending commit queue in the
ShardCommitCoordinator got increasingly behind until latencies built up
enough to cause AskTimeoutExeption's on the front-end.

The rate limiter was throttling but not enough. I realized that the rate
limiter times the commit phase but not the canCommit phase. The latter is
what times out with pending tx's sitting in the queue waiting for canCommit.
So I changed ThreePhaseCommitCohortProxy to also time the canCommit
phase. This alleviated the timeouts - even with a really high max latency of
500 ms and 100 tx / sec client thru-put. The rate limiter thru-put
reduced it to about 3 / sec.

Change-Id: I6dc73d1d657519b9410ad034c69d26f19a0cb263
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 1783b656d3b1712c73c6596f3ae15e54e801d5e5)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java [deleted file]

index 592e6bc9c9f5ea0c1856dc2674aa5c0e81e79180..c20b10229a54bcd5c09ad077dca77855058175e8 100644 (file)
@@ -23,6 +23,14 @@ interface OperationCallback {
         @Override
         public void failure() {
         }
+
+        @Override
+        public void pause() {
+        }
+
+        @Override
+        public void resume() {
+        }
     };
 
     class Reference extends AtomicReference<OperationCallback> {
@@ -34,6 +42,8 @@ interface OperationCallback {
     }
 
     void run();
+    void pause();
+    void resume();
     void success();
     void failure();
 }
index 52e7a78e5b89184a47a83dfb502c5f813fed032d..09b6568e1ade016accf0ab80788065950d469ec1 100644 (file)
@@ -40,6 +40,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
+    private volatile OperationCallback commitOperationCallback;
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -110,6 +111,11 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
             return;
         }
 
+        commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+            new TransactionRateLimitingCallback(actorContext);
+
+        commitOperationCallback.run();
+
         final Object message = new CanCommitTransaction(transactionId).toSerializable();
 
         final Iterator<ActorSelection> iterator = cohorts.iterator();
@@ -122,9 +128,14 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
+                    commitOperationCallback.failure();
                     return;
                 }
 
+                // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+                // this means we'll only time the first transaction canCommit which should be fine.
+                commitOperationCallback.pause();
+
                 boolean result = true;
                 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
                     CanCommitTransactionReply reply =
@@ -191,8 +202,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
-                new TransactionRateLimitingCallback(actorContext);
+        OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+            OperationCallback.NO_OP_CALLBACK;
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
@@ -250,7 +261,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
 
-        callback.run();
+        callback.resume();
 
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
index 678891446ab9b3da8c87315497bb1f044b681897..8ba2cb9ad936d8636c0b60bf6e40db0493d842d0 100644 (file)
@@ -9,7 +9,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 
 /**
@@ -17,8 +20,18 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
  * transaction
  */
 public class TransactionRateLimitingCallback implements OperationCallback{
+    private static Ticker TICKER = Ticker.systemTicker();
+
+    private static enum State {
+        STOPPED,
+        RUNNING,
+        PAUSED
+    }
+
     private final Timer commitTimer;
-    private Timer.Context timerContext;
+    private long startTime;
+    private long elapsedTime;
+    private volatile State state = State.STOPPED;
 
     TransactionRateLimitingCallback(ActorContext actorContext){
         commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
@@ -26,13 +39,32 @@ public class TransactionRateLimitingCallback implements OperationCallback{
 
     @Override
     public void run() {
-        timerContext = commitTimer.time();
+        Preconditions.checkState(state == State.STOPPED, "state is not STOPPED");
+        resume();
+    }
+
+    @Override
+    public void pause() {
+        if(state == State.RUNNING) {
+            elapsedTime += TICKER.read() - startTime;
+            state = State.PAUSED;
+        }
+    }
+
+    @Override
+    public void resume() {
+        if(state != State.RUNNING) {
+            startTime = TICKER.read();
+            state = State.RUNNING;
+        }
     }
 
     @Override
     public void success() {
-        Preconditions.checkState(timerContext != null, "Call run before success");
-        timerContext.stop();
+        Preconditions.checkState(state != State.STOPPED, "state is STOPPED");
+        pause();
+        commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS);
+        state = State.STOPPED;
     }
 
     @Override
@@ -42,4 +74,8 @@ public class TransactionRateLimitingCallback implements OperationCallback{
         // not going to be useful - so we leave it as it is
     }
 
+    @VisibleForTesting
+    static void setTicker(Ticker ticker) {
+        TICKER = ticker;
+    }
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java
new file mode 100644 (file)
index 0000000..6e282e7
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+/**
+ * Unit tests for TransactionRateLimitingCallback.
+ *
+ * @author Thomas Pantelis
+ */
+public class TransactionRateLimitingCallbackTest {
+
+    @Mock
+    ActorContext mockContext;
+
+    @Mock
+    Timer mockTimer;
+
+    @Mock
+    Ticker mockTicker;
+
+    TransactionRateLimitingCallback callback;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        doReturn(mockTimer).when(mockContext).getOperationTimer(ActorContext.COMMIT);
+        callback = new TransactionRateLimitingCallback(mockContext);
+        TransactionRateLimitingCallback.setTicker(mockTicker);
+    }
+
+    @Test
+    public void testSuccessWithoutPause() {
+        doReturn(1L).doReturn(201L).when(mockTicker).read();
+
+        callback.run();
+        callback.success();
+
+        verify(mockTimer).update(200L, TimeUnit.NANOSECONDS);
+    }
+
+    @Test
+    public void testSuccessWithPause() {
+        doReturn(1L).doReturn(201L).doReturn(301L).doReturn(351L).when(mockTicker).read();
+
+        callback.run();
+        callback.pause();
+        callback.pause();
+        callback.resume();
+        callback.resume();
+        callback.success();
+
+        verify(mockTimer).update(250L, TimeUnit.NANOSECONDS);
+    }
+
+    @Test
+    public void testFailure() {
+        doReturn(1L).when(mockTicker).read();
+
+        callback.run();
+        callback.failure();
+
+        verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testSuccessWithoutRun(){
+        try {
+            callback.success();
+            fail("Expected IllegalStateException");
+        } catch(IllegalStateException e){
+
+        }
+
+        verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java
deleted file mode 100644 (file)
index de3a56d..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-
-public class TransactionRateLimitingCommitCallbackTest {
-    @Mock
-    public ActorContext actorContext;
-
-    @Mock
-    public DatastoreContext datastoreContext;
-
-    @Mock
-    public Timer commitTimer;
-
-    @Mock
-    private Timer.Context commitTimerContext;
-
-    @Mock
-    private Snapshot commitSnapshot;
-
-    @Before
-    public void setUp(){
-        MockitoAnnotations.initMocks(this);
-        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
-        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
-        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
-        doReturn(commitTimerContext).when(commitTimer).time();
-        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
-    }
-
-    @Test
-    public void testSuccess(){
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(commitTimerContext).stop();
-    }
-
-    @Test
-    public void testSuccessWithoutRun(){
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-
-        try {
-            commitCallback.success();
-            fail("Expected IllegalStateException");
-        } catch(IllegalStateException e){
-
-        }
-        verify(commitTimerContext, never()).stop();
-    }
-
-
-    @Test
-    public void testFailure(){
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.failure();
-
-        verify(commitTimerContext, never()).stop();
-    }
-
-}
\ No newline at end of file