@Override
public void failure() {
}
+
+ @Override
+ public void pause() {
+ }
+
+ @Override
+ public void resume() {
+ }
};
class Reference extends AtomicReference<OperationCallback> {
}
void run();
+ void pause();
+ void resume();
void success();
void failure();
}
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private volatile OperationCallback commitOperationCallback;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
return;
}
+ commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+ new TransactionRateLimitingCallback(actorContext);
+
+ commitOperationCallback.run();
+
final Object message = new CanCommitTransaction(transactionId).toSerializable();
final Iterator<ActorSelection> iterator = cohorts.iterator();
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
+ commitOperationCallback.failure();
return;
}
+ // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+ // this means we'll only time the first transaction canCommit which should be fine.
+ commitOperationCallback.pause();
+
boolean result = true;
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
CanCommitTransactionReply reply =
@Override
public ListenableFuture<Void> commit() {
- OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
- new TransactionRateLimitingCallback(actorContext);
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
- callback.run();
+ callback.resume();
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
package org.opendaylight.controller.cluster.datastore;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
/**
* transaction
*/
public class TransactionRateLimitingCallback implements OperationCallback{
+ private static Ticker TICKER = Ticker.systemTicker();
+
+ private static enum State {
+ STOPPED,
+ RUNNING,
+ PAUSED
+ }
+
private final Timer commitTimer;
- private Timer.Context timerContext;
+ private long startTime;
+ private long elapsedTime;
+ private volatile State state = State.STOPPED;
TransactionRateLimitingCallback(ActorContext actorContext){
commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
@Override
public void run() {
- timerContext = commitTimer.time();
+ Preconditions.checkState(state == State.STOPPED, "state is not STOPPED");
+ resume();
+ }
+
+ @Override
+ public void pause() {
+ if(state == State.RUNNING) {
+ elapsedTime += TICKER.read() - startTime;
+ state = State.PAUSED;
+ }
+ }
+
+ @Override
+ public void resume() {
+ if(state != State.RUNNING) {
+ startTime = TICKER.read();
+ state = State.RUNNING;
+ }
}
@Override
public void success() {
- Preconditions.checkState(timerContext != null, "Call run before success");
- timerContext.stop();
+ Preconditions.checkState(state != State.STOPPED, "state is STOPPED");
+ pause();
+ commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS);
+ state = State.STOPPED;
}
@Override
// not going to be useful - so we leave it as it is
}
+ @VisibleForTesting
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker;
+ }
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+/**
+ * Unit tests for TransactionRateLimitingCallback.
+ *
+ * @author Thomas Pantelis
+ */
+public class TransactionRateLimitingCallbackTest {
+
+ @Mock
+ ActorContext mockContext;
+
+ @Mock
+ Timer mockTimer;
+
+ @Mock
+ Ticker mockTicker;
+
+ TransactionRateLimitingCallback callback;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(mockTimer).when(mockContext).getOperationTimer(ActorContext.COMMIT);
+ callback = new TransactionRateLimitingCallback(mockContext);
+ TransactionRateLimitingCallback.setTicker(mockTicker);
+ }
+
+ @Test
+ public void testSuccessWithoutPause() {
+ doReturn(1L).doReturn(201L).when(mockTicker).read();
+
+ callback.run();
+ callback.success();
+
+ verify(mockTimer).update(200L, TimeUnit.NANOSECONDS);
+ }
+
+ @Test
+ public void testSuccessWithPause() {
+ doReturn(1L).doReturn(201L).doReturn(301L).doReturn(351L).when(mockTicker).read();
+
+ callback.run();
+ callback.pause();
+ callback.pause();
+ callback.resume();
+ callback.resume();
+ callback.success();
+
+ verify(mockTimer).update(250L, TimeUnit.NANOSECONDS);
+ }
+
+ @Test
+ public void testFailure() {
+ doReturn(1L).when(mockTicker).read();
+
+ callback.run();
+ callback.failure();
+
+ verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testSuccessWithoutRun(){
+ try {
+ callback.success();
+ fail("Expected IllegalStateException");
+ } catch(IllegalStateException e){
+
+ }
+
+ verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-
-public class TransactionRateLimitingCommitCallbackTest {
- @Mock
- public ActorContext actorContext;
-
- @Mock
- public DatastoreContext datastoreContext;
-
- @Mock
- public Timer commitTimer;
-
- @Mock
- private Timer.Context commitTimerContext;
-
- @Mock
- private Snapshot commitSnapshot;
-
- @Before
- public void setUp(){
- MockitoAnnotations.initMocks(this);
- doReturn(datastoreContext).when(actorContext).getDatastoreContext();
- doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
- doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
- doReturn(commitTimerContext).when(commitTimer).time();
- doReturn(commitSnapshot).when(commitTimer).getSnapshot();
- }
-
- @Test
- public void testSuccess(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(commitTimerContext).stop();
- }
-
- @Test
- public void testSuccessWithoutRun(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-
- try {
- commitCallback.success();
- fail("Expected IllegalStateException");
- } catch(IllegalStateException e){
-
- }
- verify(commitTimerContext, never()).stop();
- }
-
-
- @Test
- public void testFailure(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.failure();
-
- verify(commitTimerContext, never()).stop();
- }
-
-}
\ No newline at end of file