import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
+import akka.util.Timeout;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
/**
* Unit tests for backwards compatibility with pre-Lithium versions.
return argThat(matcher);
}
+ private CanCommitTransaction eqCanCommitTransaction(final String transactionID) {
+ ArgumentMatcher<CanCommitTransaction> matcher = new ArgumentMatcher<CanCommitTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) &&
+ CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private CommitTransaction eqCommitTransaction(final String transactionID) {
+ ArgumentMatcher<CommitTransaction> matcher = new ArgumentMatcher<CommitTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) &&
+ CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private Future<Object> readySerializedTxReply(String path, short version) {
+ return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+ }
+
private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version,
+ DefaultShardStrategy.DEFAULT_SHARD);
NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
transactionProxy.delete(TestModel.TEST_PATH);
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+ AbstractThreePhaseCommitCohort<?> proxy = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+ Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit.booleanValue());
+
+ proxy.preCommit().get(3, TimeUnit.SECONDS);
+
+ proxy.commit().get(3, TimeUnit.SECONDS);
return actorRef;
}
// creating transaction actors for write-only Tx's.
public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
short version = DataStoreVersions.HELIUM_2_VERSION;
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
+ DefaultShardStrategy.DEFAULT_SHARD);
NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),