- public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
- // This test tests the timing window in which a change listener is registered before the
- // shard becomes the leader. We verify that the listener is registered and notified of the
- // existing data when the shard becomes the leader.
- new ShardTestKit(getSystem()) {{
- // For this test, we want to send the RegisterChangeListener message after the shard
- // has recovered from persistence and before it becomes the leader. So we subclass
- // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
- // we know that the shard has been initialized to a follower and has started the
- // election process. The following 2 CountDownLatches are used to coordinate the
- // ElectionTimeout with the sending of the RegisterChangeListener message.
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- // Use a non persistent provider because this test actually invokes persist on the journal
- // this will cause all other messages to not be queued properly after that.
- // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
- // it does do a persist)
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
- if(message instanceof ElectionTimeout && firstElectionTimeout) {
- // Got the first ElectionTimeout. We don't forward it to the
- // base Shard yet until we've sent the RegisterChangeListener
- // message. So we signal the onFirstElectionTimeout latch to tell
- // the main thread to send the RegisterChangeListener message and
- // start a thread to wait on the onChangeListenerRegistered latch,
- // which the main thread signals after it has sent the message.
- // After the onChangeListenerRegistered is triggered, we send the
- // original ElectionTimeout message to proceed with the election.
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.onReceiveCommand(message);
- }
- }
- };
- }
- };
-
- MockDataChangeListener listener = new MockDataChangeListener(1);
- ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
- "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
- TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
- "testRegisterChangeListenerWhenNotLeaderInitially");
-
- // Write initial data into the in-memory store.
- YangInstanceIdentifier path = TestModel.TEST_PATH;
- writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- // Wait until the shard receives the first ElectionTimeout message.
- assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- // Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor,
- AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
-
- RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- // Sanity check - verify the shard is not the leader yet.
- shard.tell(new FindLeader(), getRef());
- FindLeaderReply findLeadeReply =
- expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
-
- // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
- // with the election process.
- onChangeListenerRegistered.countDown();