import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
AsyncDataBroker.DataChangeScope.BASE, true), getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
final String replyPath = reply.getListenerRegistrationPath().toString();
assertTrue("Incorrect reply path: " + replyPath,
replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
// original ElectionTimeout message to proceed with the election.
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
// Sanity check - verify the shard is not the leader yet.
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
final String replyPath = reply.getListenerRegistrationPath().toString();
assertTrue("Incorrect reply path: " + replyPath,
replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
if (message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(FindLeader.INSTANCE, getRef());
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {
{
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
followerShard.tell(
new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
waitUntilNoLeader(shard);
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
waitUntilNoLeader(shard);
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));