X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=e8f7e32310b8971de356a4cc3f26049f239b417b;hb=731e7284cf0895fdb1b89427f91762e80e67c2ff;hp=76745a442ca0e9d09100e447e7c6a25ce54d6162;hpb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 76745a442c..e8f7e32310 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -70,7 +71,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; @@ -79,7 +79,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; -import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -104,7 +103,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -127,134 +125,6 @@ public class ShardTest extends AbstractShardTest { private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in " + "InMemorySnapshotStore and journal recovery seq number will start from 1"; - @Test - public void testRegisterChangeListener() throws Exception { - new ShardTestKit(getSystem()) { - { - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testRegisterChangeListener"); - - waitUntilLeader(shard); - - shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); - - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, - TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener"); - - shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor, - AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - - final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"), - RegisterDataTreeNotificationListenerReply.class); - final String replyPath = reply.getListenerRegistrationPath().toString(); - assertTrue("Incorrect reply path: " + replyPath, - replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - listener.waitForChangeEvents(path); - } - }; - } - - @SuppressWarnings("serial") - @Test - public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { - // This test tests the timing window in which a change listener is registered before the - // shard becomes the leader. We verify that the listener is registered and notified of the - // existing data when the shard becomes the leader. - // For this test, we want to send the RegisterChangeListener message after the shard - // has recovered from persistence and before it becomes the leader. So we subclass - // Shard to override onReceiveCommand and, when the first ElectionTimeout is received, - // we know that the shard has been initialized to a follower and has started the - // election process. The following 2 CountDownLatches are used to coordinate the - // ElectionTimeout with the sending of the RegisterChangeListener message. - final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); - final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { - boolean firstElectionTimeout = true; - - @Override - public Shard create() throws Exception { - // Use a non persistent provider because this test actually invokes persist on the journal - // this will cause all other messages to not be queued properly after that. - // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when - // it does do a persist) - return new Shard(newShardBuilder()) { - @Override - public void handleCommand(final Object message) { - if (message instanceof ElectionTimeout && firstElectionTimeout) { - // Got the first ElectionTimeout. We don't forward it to the - // base Shard yet until we've sent the RegisterChangeListener - // message. So we signal the onFirstElectionTimeout latch to tell - // the main thread to send the RegisterChangeListener message and - // start a thread to wait on the onChangeListenerRegistered latch, - // which the main thread signals after it has sent the message. - // After the onChangeListenerRegistered is triggered, we send the - // original ElectionTimeout message to proceed with the election. - firstElectionTimeout = false; - final ActorRef self = getSelf(); - new Thread(() -> { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - }).start(); - - onFirstElectionTimeout.countDown(); - } else { - super.handleCommand(message); - } - } - }; - } - }; - - setupInMemorySnapshotStore(); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); - - final TestActorRef shard = actorFactory.createTestActor( - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - "testRegisterChangeListenerWhenNotLeaderInitially"); - - new ShardTestKit(getSystem()) { - { - // Wait until the shard receives the first ElectionTimeout - // message. - assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - - // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false), - getRef()); - - final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeNotificationListenerReply.class); - assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); - - // Sanity check - verify the shard is not the leader yet. - shard.tell(FindLeader.INSTANCE, getRef()); - final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent()); - - // Signal the onChangeListenerRegistered latch to tell the - // thread above to proceed - // with the election process. - onChangeListenerRegistered.countDown(); - - // Wait for the shard to become the leader and notify our - // listener with the existing - // data in the store. - listener.waitForChangeEvents(path); - } - }; - } - @Test public void testRegisterDataTreeChangeListener() throws Exception { new ShardTestKit(getSystem()) { @@ -802,7 +672,7 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -845,7 +715,7 @@ public class ShardTest extends AbstractShardTest { final Throwable cause = failure.cause(); batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -976,7 +846,8 @@ public class ShardTest extends AbstractShardTest { failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); - shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef()); + shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()), + getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); } @@ -1041,7 +912,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, true, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -1074,7 +946,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, false, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -1702,7 +1575,7 @@ public class ShardTest extends AbstractShardTest { .apply(modification3); modification3.ready(); final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, - true); + true, Optional.empty()); shard.tell(readyMessage, getRef()); // Commit the first Tx. After completing, the second should @@ -2119,90 +1992,6 @@ public class ShardTest extends AbstractShardTest { assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus()); } - @Test - public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception { - new ShardTestKit(getSystem()) { - { - final String testName = "testClusteredDataChangeListenerWithDelayedRegistration"; - dataStoreContextBuilder.shardElectionTimeoutFactor(1000) - .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - actorFactory.generateActorId(testName + "-DataChangeListener")); - - setupInMemorySnapshotStore(); - - final TestActorRef shard = actorFactory.createTestActor( - newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(testName + "-shard")); - - waitUntilNoLeader(shard); - - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), - getRef()); - final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeNotificationListenerReply.class); - assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); - - shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) - .customRaftPolicyImplementation(null).build(), ActorRef.noSender()); - - listener.waitForChangeEvents(); - } - }; - } - - @Test - public void testClusteredDataChangeListenerRegistration() throws Exception { - new ShardTestKit(getSystem()) { - { - final String testName = "testClusteredDataChangeListenerRegistration"; - final ShardIdentifier followerShardID = ShardIdentifier.create("inventory", - MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config"); - - final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory", - MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config"); - - final TestActorRef followerShard = actorFactory - .createTestActor(Shard.builder().id(followerShardID) - .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) - .peerAddresses(Collections.singletonMap(leaderShardID.toString(), - "akka://test/user/" + leaderShardID.toString())) - .schemaContextProvider(() -> SCHEMA_CONTEXT).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); - - final TestActorRef leaderShard = actorFactory - .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) - .peerAddresses(Collections.singletonMap(followerShardID.toString(), - "akka://test/user/" + followerShardID.toString())) - .schemaContextProvider(() -> SCHEMA_CONTEXT).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); - - leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - final String leaderPath = waitUntilLeader(followerShard); - assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - actorFactory.generateActorId(testName + "-DataChangeListener")); - - followerShard.tell( - new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), - getRef()); - final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeNotificationListenerReply.class); - assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); - - writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - listener.waitForChangeEvents(); - } - }; - } - @Test public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {