import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
private Future<ActorSelection> newCohort() {
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType));
+ isA(requestType), any(Timeout.class));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType));
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
propagateExecutionExceptionCause(proxy.commit());
} finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
+
}
@Test
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
new CommitTransactionReply(), new CommitTransactionReply());
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
proxy.canCommit().get(5, TimeUnit.SECONDS);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ // Verify that the creation limit was changed to 0.5 (based on setup)
+ verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}