BUG-8452: make NoShardLeaderException retriable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 0bd34aac3ee768eb9c4abc0f872b0e83cf67f547..86a78a701f2e50d0ccc0fd112fd631fc3e6f6bc9 100644 (file)
@@ -71,9 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -148,8 +147,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
@@ -199,14 +198,11 @@ public class ShardTest extends AbstractShardTest {
                             // original ElectionTimeout message to proceed with the election.
                             firstElectionTimeout = false;
                             final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
+                            new Thread(() -> {
+                                Uninterruptibles.awaitUninterruptibly(
+                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                self.tell(message, self);
+                            }).start();
 
                             onFirstElectionTimeout.countDown();
                         } else {
@@ -238,8 +234,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
                         getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 // Sanity check - verify the shard is not the leader yet.
@@ -278,8 +274,8 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
@@ -308,14 +304,11 @@ public class ShardTest extends AbstractShardTest {
                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
                             firstElectionTimeout = false;
                             final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
+                            new Thread(() -> {
+                                Uninterruptibles.awaitUninterruptibly(
+                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                self.tell(message, self);
+                            }).start();
 
                             onFirstElectionTimeout.countDown();
                         } else {
@@ -342,8 +335,8 @@ public class ShardTest extends AbstractShardTest {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
                 shard.tell(FindLeader.INSTANCE, getRef());
@@ -2020,13 +2013,13 @@ public class ShardTest extends AbstractShardTest {
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {
             {
@@ -2146,8 +2139,8 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2174,14 +2167,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2196,9 +2189,9 @@ public class ShardTest extends AbstractShardTest {
                 followerShard.tell(
                         new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
-                assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -2228,8 +2221,8 @@ public class ShardTest extends AbstractShardTest {
                 waitUntilNoLeader(shard);
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2261,8 +2254,8 @@ public class ShardTest extends AbstractShardTest {
                 waitUntilNoLeader(shard);
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
@@ -2293,14 +2286,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2313,8 +2306,8 @@ public class ShardTest extends AbstractShardTest {
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));