- public void testRegisterChangeListener() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListener");
-
- waitUntilLeader(shard);
-
- shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
- "testRegisterChangeListener-DataChangeListener");
-
- shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
- AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
- final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterChangeListenerReply.class);
- final String replyPath = reply.getListenerRegistrationPath().toString();
- assertTrue("Incorrect reply path: " + replyPath,
- replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents(path);
- }
- };
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
- // This test tests the timing window in which a change listener is registered before the
- // shard becomes the leader. We verify that the listener is registered and notified of the
- // existing data when the shard becomes the leader.
- // For this test, we want to send the RegisterChangeListener message after the shard
- // has recovered from persistence and before it becomes the leader. So we subclass
- // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
- // we know that the shard has been initialized to a follower and has started the
- // election process. The following 2 CountDownLatches are used to coordinate the
- // ElectionTimeout with the sending of the RegisterChangeListener message.
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- // Use a non persistent provider because this test actually invokes persist on the journal
- // this will cause all other messages to not be queued properly after that.
- // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
- // it does do a persist)
- return new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- if (message instanceof ElectionTimeout && firstElectionTimeout) {
- // Got the first ElectionTimeout. We don't forward it to the
- // base Shard yet until we've sent the RegisterChangeListener
- // message. So we signal the onFirstElectionTimeout latch to tell
- // the main thread to send the RegisterChangeListener message and
- // start a thread to wait on the onChangeListenerRegistered latch,
- // which the main thread signals after it has sent the message.
- // After the onChangeListenerRegistered is triggered, we send the
- // original ElectionTimeout message to proceed with the election.
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.handleCommand(message);
- }
- }
- };
- }
- };