+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+ // This test tests the timing window in which a change listener is registered before the
+ // shard becomes the leader. We verify that the listener is registered and notified of the
+ // existing data when the shard becomes the leader.
+ new ShardTestKit(getSystem()) {{
+ // For this test, we want to send the RegisterChangeListener message after the shard
+ // has recovered from persistence and before it becomes the leader. So we subclass
+ // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+ // we know that the shard has been initialized to a follower and has started the
+ // election process. The following 2 CountDownLatches are used to coordinate the
+ // ElectionTimeout with the sending of the RegisterChangeListener message.
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ // Got the first ElectionTimeout. We don't forward it to the
+ // base Shard yet until we've sent the RegisterChangeListener
+ // message. So we signal the onFirstElectionTimeout latch to tell
+ // the main thread to send the RegisterChangeListener message and
+ // start a thread to wait on the onChangeListenerRegistered latch,
+ // which the main thread signals after it has sent the message.
+ // After the onChangeListenerRegistered is triggered, we send the
+ // original ElectionTimeout message to proceed with the election.
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testRegisterChangeListenerWhenNotLeaderInitially");
+
+ // Write initial data into the in-memory store.
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ // Wait until the shard receives the first ElectionTimeout message.
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ // Now send the RegisterChangeListener and wait for the reply.
+ shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+
+ RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ // Sanity check - verify the shard is not the leader yet.
+ shard.tell(new FindLeader(), getRef());
+ FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
+ // with the election process.
+ onChangeListenerRegistered.countDown();
+
+ // Wait for the shard to become the leader and notify our listener with the existing
+ // data in the store.
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());