import akka.testkit.TestActorRef;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+ private final TransactionIdentifier tx = nextTransactionId();
+
@Before
public void setUp() {
@Test
public void testCanCommitYesWithOneCohort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithOneCohort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
- CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
@Test
public void testCanCommitYesWithTwoCohorts() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithThreeCohorts() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.no(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
@Test(expected = TestException.class)
public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = IllegalArgumentException.class)
public void testCanCommitWithInvalidResponseType() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = TestException.class)
public void testCanCommitWithFailedCohortFuture() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1")),
+ newCohortInfo(new CohortActor.Builder(tx)),
newCohortInfoWithFailedFuture(new TestException()),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test
public void testAllThreePhasesSuccessful() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test(expected = TestException.class)
public void testCommitWithExceptionFailure() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(new TestException())));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test(expected = IllegalArgumentException.class)
public void testCommitWithInvalidResponseType() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
- expectCommit("invalid"))), "txn-1");
+ expectCommit("invalid"))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test
public void testAbort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
- AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(
+ AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testAbortWithFailure() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
// The exception should not get propagated.
verifySuccessfulFuture(proxy.abort());
@Test
public void testAbortWithFailedCohortFuture() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfoWithFailedFuture(new TestException()),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testWithNoCohorts() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
- Collections.<CohortInfo>emptyList(), "txn-1");
-
- verifyCanCommit(proxy.canCommit(), true);
- verifySuccessfulFuture(proxy.preCommit());
- verifySuccessfulFuture(proxy.commit());
- verifyCohortActors();
- }
-
- @Test
- public void testBackwardsCompatibilityWithPreBoron() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
- expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
- CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
- expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
- CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
- DataStoreVersions.LITHIUM_VERSION));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ Collections.<CohortInfo>emptyList(), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
});
}
- private CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+ private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
@Override
public Short get() {
@Override
public void onReceive(Object message) {
if(CanCommitTransaction.isSerializedType(message)) {
+ canCommitCount.incrementAndGet();
onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
builder.expCanCommitType, builder.canCommitReply);
- canCommitCount.incrementAndGet();
} else if(CommitTransaction.isSerializedType(message)) {
+ commitCount.incrementAndGet();
onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
builder.expCommitType, builder.commitReply);
- commitCount.incrementAndGet();
} else if(AbortTransaction.isSerializedType(message)) {
+ abortCount.incrementAndGet();
onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
builder.expAbortType, builder.abortReply);
- abortCount.incrementAndGet();
} else {
assertionError = new AssertionError("Unexpected message " + message);
}
private Object canCommitReply;
private Object commitReply;
private Object abortReply;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
- Builder(String transactionId) {
- this.transactionId = transactionId;
+ Builder(TransactionIdentifier transactionId) {
+ this.transactionId = Preconditions.checkNotNull(transactionId);
}
Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {