+ @SuppressWarnings("serial")
+ @Test
+ public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+ // This test tests the timing window in which a change listener is registered before the
+ // shard becomes the leader. We verify that the listener is registered and notified of the
+ // existing data when the shard becomes the leader.
+ // For this test, we want to send the RegisterChangeListener message after the shard
+ // has recovered from persistence and before it becomes the leader. So we subclass
+ // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+ // we know that the shard has been initialized to a follower and has started the
+ // election process. The following 2 CountDownLatches are used to coordinate the
+ // ElectionTimeout with the sending of the RegisterChangeListener message.
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ final Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ // Use a non persistent provider because this test actually invokes persist on the journal
+ // this will cause all other messages to not be queued properly after that.
+ // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+ // it does do a persist)
+ return new Shard(newShardBuilder()) {
+ @Override
+ public void handleCommand(final Object message) {
+ if (message instanceof ElectionTimeout && firstElectionTimeout) {
+ // Got the first ElectionTimeout. We don't forward it to the
+ // base Shard yet until we've sent the RegisterChangeListener
+ // message. So we signal the onFirstElectionTimeout latch to tell
+ // the main thread to send the RegisterChangeListener message and
+ // start a thread to wait on the onChangeListenerRegistered latch,
+ // which the main thread signals after it has sent the message.
+ // After the onChangeListenerRegistered is triggered, we send the
+ // original ElectionTimeout message to proceed with the election.
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.handleCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ setupInMemorySnapshotStore();
+
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testRegisterChangeListenerWhenNotLeaderInitially");
+
+ new ShardTestKit(getSystem()) {
+ {
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ // Wait until the shard receives the first ElectionTimeout
+ // message.
+ assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ // Now send the RegisterChangeListener and wait for the reply.
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
+ getRef());
+
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ // Sanity check - verify the shard is not the leader yet.
+ shard.tell(FindLeader.INSTANCE, getRef());
+ final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
+
+ // Signal the onChangeListenerRegistered latch to tell the
+ // thread above to proceed
+ // with the election process.
+ onChangeListenerRegistered.countDown();
+
+ // Wait for the shard to become the leader and notify our
+ // listener with the existing
+ // data in the store.
+ listener.waitForChangeEvents(path);
+ }
+ };