import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
- assertTrue("Unexpected transaction path " + path, path
- .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
+ assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+ "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
+ shardID.getShardName(), shardID.getMemberName().getName())));
}
};
}
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
- assertTrue("Unexpected transaction path " + path, path.startsWith(
- "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
+ assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+ "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
+ shardID.getShardName(), shardID.getMemberName().getName())));
}
};
}
final ReadyTransactionReply readyReply = ReadyTransactionReply
.fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
}
@Test
- public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
};
}
+ @Test
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ setupInMemorySnapshotStore();
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+ expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+ shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+ }
+ };
+ }
+
@Test
public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());