X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractTransactionProxyTest.java;h=1b9ea169b89a5f3bedd567e4607f6c94d6ac80b0;hb=8f56000dedbca1bbf61cbf3eef24ea19a333b62f;hp=0e1a3b73044696174592cb9b72febd3773531d4a;hpb=4b21f0e68572b4209c1e572a241cf1ef3c699327;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 0e1a3b7304..1b9ea169b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -17,6 +17,7 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -26,8 +27,7 @@ import akka.testkit.JavaTestKit; import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.base.Objects; -import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.typesafe.config.Config; @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.Before; @@ -46,6 +47,8 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException; import org.opendaylight.controller.cluster.datastore.config.Configuration; @@ -53,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; @@ -72,10 +76,9 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +91,7 @@ import scala.concurrent.duration.Duration; * * @author Thomas Pantelis */ -public abstract class AbstractTransactionProxyTest { +public abstract class AbstractTransactionProxyTest extends AbstractTest { protected final Logger log = LoggerFactory.getLogger(getClass()); private static ActorSystem system; @@ -100,12 +103,22 @@ public abstract class AbstractTransactionProxyTest { public String findShard(YangInstanceIdentifier path) { return "junk"; } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } }).put( "cars", new ShardStrategy() { @Override public String findShard(YangInstanceIdentifier path) { return "cars"; } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } }).build(); @Override @@ -115,9 +128,9 @@ public abstract class AbstractTransactionProxyTest { @Override public String getModuleNameFromNameSpace(String nameSpace) { - if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { + if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { return "junk"; - } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){ + } else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { return "cars"; } return null; @@ -143,10 +156,10 @@ public abstract class AbstractTransactionProxyTest { @BeforeClass public static void setUpClass() throws IOException { - Config config = ConfigFactory.parseMap(ImmutableMap.builder(). - put("akka.actor.default-dispatcher.type", - "akka.testkit.CallingThreadDispatcherConfigurator").build()). - withFallback(ConfigFactory.load()); + Config config = ConfigFactory.parseMap(ImmutableMap.builder() + .put("akka.actor.default-dispatcher.type", + "akka.testkit.CallingThreadDispatcherConfigurator").build()) + .withFallback(ConfigFactory.load()); system = ActorSystem.create("test", config); } @@ -157,22 +170,24 @@ public abstract class AbstractTransactionProxyTest { } @Before - public void setUp(){ + public void setUp() { MockitoAnnotations.initMocks(this); schemaContext = TestModel.createTestContext(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory(); + doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName(); + doReturn(new ShardStrategyFactory(configuration, + LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); - mockComponentFactory = TransactionContextFactory.create(mockActorContext); + final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName); + mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId); Timer timer = new MetricRegistry().timer("test"); doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class)); @@ -182,15 +197,15 @@ public abstract class AbstractTransactionProxyTest { return system; } - protected CreateTransaction eqCreateTransaction(final String memberName, + protected CreateTransaction eqCreateTransaction(final String expMemberName, final TransactionType type) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { + if (CreateTransaction.class.equals(argument.getClass())) { CreateTransaction obj = CreateTransaction.fromSerializable(argument); - return obj.getTransactionId().startsWith(memberName) && - obj.getTransactionType() == type.ordinal(); + return obj.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName() + .getName().equals(expMemberName) && obj.getTransactionType() == type.ordinal(); } return false; @@ -200,52 +215,26 @@ public abstract class AbstractTransactionProxyTest { return argThat(matcher); } - protected DataExists eqSerializedDataExists() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && - DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - protected DataExists eqDataExists() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - return (argument instanceof DataExists) && - ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + return argument instanceof DataExists && ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); } }; return argThat(matcher); } - protected ReadData eqSerializedReadData() { - return eqSerializedReadData(TestModel.TEST_PATH); - } - - protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && - ReadData.fromSerializable(argument).getPath().equals(path); - } - }; - - return argThat(matcher); + protected ReadData eqReadData() { + return eqReadData(TestModel.TEST_PATH); } - protected ReadData eqReadData() { + protected ReadData eqReadData(final YangInstanceIdentifier path) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - return (argument instanceof ReadData) && - ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); + return argument instanceof ReadData && ((ReadData)argument).getPath().equals(path); } }; @@ -256,25 +245,13 @@ public abstract class AbstractTransactionProxyTest { return Futures.successful((Object)new ReadyTransactionReply(path)); } - protected Future readSerializedDataReply(NormalizedNode data, - short transactionVersion) { - return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); - } - - protected Future readSerializedDataReply(NormalizedNode data) { - return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); - } protected Future readDataReply(NormalizedNode data) { return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); } - protected Future dataExistsSerializedReply(boolean exists) { - return Futures.successful(DataExistsReply.create(exists).toSerializable()); - } - protected Future dataExistsReply(boolean exists) { - return Futures.successful(DataExistsReply.create(exists)); + return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION)); } protected Future batchedModificationsReply(int count) { @@ -292,7 +269,12 @@ public abstract class AbstractTransactionProxyTest { protected void expectBatchedModifications(ActorRef actorRef, int count) { doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); + } + + protected void expectBatchedModifications(int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectBatchedModificationsReady(ActorRef actorRef) { @@ -302,22 +284,17 @@ public abstract class AbstractTransactionProxyTest { protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) { doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) : readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); - } - - protected void expectBatchedModifications(int count) { - doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectIncompleteBatchedModifications() { doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectFailedBatchedModifications(ActorRef actorRef) { doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectReadyLocalTransaction(ActorRef actorRef, boolean doCommitOnReady) { @@ -326,30 +303,17 @@ public abstract class AbstractTransactionProxyTest { eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class)); } - protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .setMessageVersion(transactionVersion) - .build(); + protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion) { + return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion); } protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD); } - protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) { - return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION); - } - - protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef, - short transactionVersion) { - return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()), - transactionVersion, Optional.absent())); - } - protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) { - return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, DataStoreVersions.CURRENT_VERSION); + return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, + DataStoreVersions.CURRENT_VERSION); } protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName, @@ -357,17 +321,25 @@ public abstract class AbstractTransactionProxyTest { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard actor {}", actorRef); - doReturn(actorSystem.actorSelection(actorRef.path())). - when(mockActorContext).actorSelection(actorRef.path().toString()); - - doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)). - when(mockActorContext).findPrimaryShardAsync(eq(shardName)); + doReturn(actorSystem.actorSelection(actorRef.path())) + .when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)) + .when(mockActorContext).findPrimaryShardAsync(eq(shardName)); return actorRef; } + protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) { + return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION); + } + + protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef, + short transactionVersion) { + return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()), + transactionVersion)); + } + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, short transactionVersion, String shardName) { ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, @@ -381,18 +353,18 @@ public abstract class AbstractTransactionProxyTest { TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) { ActorRef txActorRef; - if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION && - dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) { + if (type == TransactionType.WRITE_ONLY + && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) { txActorRef = shardActorRef; } else { txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard Tx actor {}", txActorRef); - doReturn(actorSystem.actorSelection(txActorRef.path())). - when(mockActorContext).actorSelection(txActorRef.path().toString()); + doReturn(actorSystem.actorSelection(txActorRef.path())) + .when(mockActorContext).actorSelection(txActorRef.path().toString()); - doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext) + .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(prefix, type), any(Timeout.class)); } @@ -410,19 +382,21 @@ public abstract class AbstractTransactionProxyTest { shardName); } - protected void propagateReadFailedExceptionCause(CheckedFuture future) - throws Throwable { - + protected void propagateReadFailedExceptionCause(CheckedFuture future) throws Exception { try { future.checkedGet(5, TimeUnit.SECONDS); fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { + } catch (ReadFailedException e) { assertNotNull("Expected a cause", e.getCause()); - if(e.getCause().getCause() != null) { - throw e.getCause().getCause(); + Throwable cause; + if (e.getCause().getCause() != null) { + cause = e.getCause().getCause(); } else { - throw e.getCause(); + cause = e.getCause(); } + + Throwables.propagateIfInstanceOf(cause, Exception.class); + Throwables.propagate(cause); } } @@ -430,7 +404,7 @@ public abstract class AbstractTransactionProxyTest { ArgumentCaptor batchedModificationsCaptor = ArgumentCaptor.forClass(BatchedModifications.class); verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( - eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); + eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class)); List batchedModifications = filterCaptured( batchedModificationsCaptor, BatchedModifications.class); @@ -439,8 +413,8 @@ public abstract class AbstractTransactionProxyTest { protected List filterCaptured(ArgumentCaptor captor, Class type) { List captured = new ArrayList<>(); - for(T c: captor.getAllValues()) { - if(type.isInstance(c)) { + for (T c: captor.getAllValues()) { + if (type.isInstance(c)) { captured.add(c); } } @@ -466,57 +440,57 @@ public abstract class AbstractTransactionProxyTest { assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); assertEquals("isReady", expIsReady, batchedModifications.isReady()); assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady()); - for(int i = 0; i < batchedModifications.getModifications().size(); i++) { + for (int i = 0; i < batchedModifications.getModifications().size(); i++) { Modification actual = batchedModifications.getModifications().get(i); assertEquals("Modification type", expected[i].getClass(), actual.getClass()); assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), ((AbstractModification)actual).getPath()); - if(actual instanceof WriteModification) { + if (actual instanceof WriteModification) { assertEquals("getData", ((WriteModification)expected[i]).getData(), ((WriteModification)actual).getData()); } } } + @SuppressWarnings("checkstyle:IllegalCatch") protected void verifyCohortFutures(AbstractThreePhaseCommitCohort proxy, Object... expReplies) throws Exception { - assertEquals("getReadyOperationFutures size", expReplies.length, - proxy.getCohortFutures().size()); - - List futureResults = new ArrayList<>(); - for( Future future: proxy.getCohortFutures()) { - assertNotNull("Ready operation Future is null", future); - try { - futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS))); - } catch(Exception e) { - futureResults.add(e); - } + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortFutures().size()); + + List futureResults = new ArrayList<>(); + for (Future future : proxy.getCohortFutures()) { + assertNotNull("Ready operation Future is null", future); + try { + futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS))); + } catch (Exception e) { + futureResults.add(e); } + } - for(int i = 0; i < expReplies.length; i++) { - Object expReply = expReplies[i]; - boolean found = false; - Iterator iter = futureResults.iterator(); - while(iter.hasNext()) { - Object actual = iter.next(); - if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(expReply) && - CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(actual)) { - found = true; - } else if(expReply instanceof ActorSelection && Objects.equal(expReply, actual)) { - found = true; - } else if(expReply instanceof Class && ((Class)expReply).isInstance(actual)) { - found = true; - } - - if(found) { - iter.remove(); - break; - } + for (Object expReply : expReplies) { + boolean found = false; + Iterator iter = futureResults.iterator(); + while (iter.hasNext()) { + Object actual = iter.next(); + if (CommitTransactionReply.isSerializedType(expReply) + && CommitTransactionReply.isSerializedType(actual)) { + found = true; + } else if (expReply instanceof ActorSelection && Objects.equals(expReply, actual)) { + found = true; + } else if (expReply instanceof Class && ((Class) expReply).isInstance(actual)) { + found = true; } - if(!found) { - fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults)); + if (found) { + iter.remove(); + break; } } + + if (!found) { + fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults)); + } } + } }