From: Tom Pantelis Date: Thu, 2 Jul 2015 02:53:56 +0000 (-0400) Subject: CDS: Include CAN_COMMIT phase in rate limiter time period X-Git-Tag: release/beryllium~355 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=06691686cff16d5766a1431ecb43d22ca410b71c CDS: Include CAN_COMMIT phase in rate limiter time period I was testing with simulated latency in the followers. With a high enough latency and tx thru-put, the pending commit queue in the ShardCommitCoordinator got increasingly behind until latencies built up enough to cause AskTimeoutExeption's on the front-end. The rate limiter was throttling but not enough. I realized that the rate limiter times the commit phase but not the canCommit phase. The latter is what times out with pending tx's sitting in the queue waiting for canCommit. So I changed ThreePhaseCommitCohortProxy to also time the canCommit phase. This alleviated the timeouts - even with a really high max latency of 500 ms and 100 tx / sec client thru-put. The rate limiter thru-put reduced it to about 3 / sec. Change-Id: I6dc73d1d657519b9410ad034c69d26f19a0cb263 Signed-off-by: Tom Pantelis (cherry picked from commit 1783b656d3b1712c73c6596f3ae15e54e801d5e5) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java index 592e6bc9c9..c20b10229a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java @@ -23,6 +23,14 @@ interface OperationCallback { @Override public void failure() { } + + @Override + public void pause() { + } + + @Override + public void resume() { + } }; class Reference extends AtomicReference { @@ -34,6 +42,8 @@ interface OperationCallback { } void run(); + void pause(); + void resume(); void success(); void failure(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 52e7a78e5b..09b6568e1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -40,6 +40,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< private final List> cohortFutures; private volatile List cohorts; private final String transactionId; + private volatile OperationCallback commitOperationCallback; public ThreePhaseCommitCohortProxy(ActorContext actorContext, List> cohortFutures, String transactionId) { @@ -110,6 +111,11 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< return; } + commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK : + new TransactionRateLimitingCallback(actorContext); + + commitOperationCallback.run(); + final Object message = new CanCommitTransaction(transactionId).toSerializable(); final Iterator iterator = cohorts.iterator(); @@ -122,9 +128,14 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); } returnFuture.setException(failure); + commitOperationCallback.failure(); return; } + // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So + // this means we'll only time the first transaction canCommit which should be fine. + commitOperationCallback.pause(); + boolean result = true; if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { CanCommitTransactionReply reply = @@ -191,8 +202,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< @Override public ListenableFuture commit() { - OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK : - new TransactionRateLimitingCallback(actorContext); + OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback : + OperationCallback.NO_OP_CALLBACK; return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback); @@ -250,7 +261,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< LOG.debug("Tx {} finish {}", transactionId, operationName); } - callback.run(); + callback.resume(); Future> combinedFuture = invokeCohorts(message); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java index 678891446a..8ba2cb9ad9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java @@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.datastore; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; /** @@ -17,8 +20,18 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; * transaction */ public class TransactionRateLimitingCallback implements OperationCallback{ + private static Ticker TICKER = Ticker.systemTicker(); + + private static enum State { + STOPPED, + RUNNING, + PAUSED + } + private final Timer commitTimer; - private Timer.Context timerContext; + private long startTime; + private long elapsedTime; + private volatile State state = State.STOPPED; TransactionRateLimitingCallback(ActorContext actorContext){ commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT); @@ -26,13 +39,32 @@ public class TransactionRateLimitingCallback implements OperationCallback{ @Override public void run() { - timerContext = commitTimer.time(); + Preconditions.checkState(state == State.STOPPED, "state is not STOPPED"); + resume(); + } + + @Override + public void pause() { + if(state == State.RUNNING) { + elapsedTime += TICKER.read() - startTime; + state = State.PAUSED; + } + } + + @Override + public void resume() { + if(state != State.RUNNING) { + startTime = TICKER.read(); + state = State.RUNNING; + } } @Override public void success() { - Preconditions.checkState(timerContext != null, "Call run before success"); - timerContext.stop(); + Preconditions.checkState(state != State.STOPPED, "state is STOPPED"); + pause(); + commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS); + state = State.STOPPED; } @Override @@ -42,4 +74,8 @@ public class TransactionRateLimitingCallback implements OperationCallback{ // not going to be useful - so we leave it as it is } + @VisibleForTesting + static void setTicker(Ticker ticker) { + TICKER = ticker; + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java new file mode 100644 index 0000000000..6e282e7705 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallbackTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import com.codahale.metrics.Timer; +import com.google.common.base.Ticker; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; + +/** + * Unit tests for TransactionRateLimitingCallback. + * + * @author Thomas Pantelis + */ +public class TransactionRateLimitingCallbackTest { + + @Mock + ActorContext mockContext; + + @Mock + Timer mockTimer; + + @Mock + Ticker mockTicker; + + TransactionRateLimitingCallback callback; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + doReturn(mockTimer).when(mockContext).getOperationTimer(ActorContext.COMMIT); + callback = new TransactionRateLimitingCallback(mockContext); + TransactionRateLimitingCallback.setTicker(mockTicker); + } + + @Test + public void testSuccessWithoutPause() { + doReturn(1L).doReturn(201L).when(mockTicker).read(); + + callback.run(); + callback.success(); + + verify(mockTimer).update(200L, TimeUnit.NANOSECONDS); + } + + @Test + public void testSuccessWithPause() { + doReturn(1L).doReturn(201L).doReturn(301L).doReturn(351L).when(mockTicker).read(); + + callback.run(); + callback.pause(); + callback.pause(); + callback.resume(); + callback.resume(); + callback.success(); + + verify(mockTimer).update(250L, TimeUnit.NANOSECONDS); + } + + @Test + public void testFailure() { + doReturn(1L).when(mockTicker).read(); + + callback.run(); + callback.failure(); + + verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testSuccessWithoutRun(){ + try { + callback.success(); + fail("Expected IllegalStateException"); + } catch(IllegalStateException e){ + + } + + verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class)); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java deleted file mode 100644 index de3a56d078..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; - -public class TransactionRateLimitingCommitCallbackTest { - @Mock - public ActorContext actorContext; - - @Mock - public DatastoreContext datastoreContext; - - @Mock - public Timer commitTimer; - - @Mock - private Timer.Context commitTimerContext; - - @Mock - private Snapshot commitSnapshot; - - @Before - public void setUp(){ - MockitoAnnotations.initMocks(this); - doReturn(datastoreContext).when(actorContext).getDatastoreContext(); - doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); - doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); - doReturn(commitTimerContext).when(commitTimer).time(); - doReturn(commitSnapshot).when(commitTimer).getSnapshot(); - } - - @Test - public void testSuccess(){ - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.success(); - - verify(commitTimerContext).stop(); - } - - @Test - public void testSuccessWithoutRun(){ - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - - try { - commitCallback.success(); - fail("Expected IllegalStateException"); - } catch(IllegalStateException e){ - - } - verify(commitTimerContext, never()).stop(); - } - - - @Test - public void testFailure(){ - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.failure(); - - verify(commitTimerContext, never()).stop(); - } - -} \ No newline at end of file