Reduce ShardDataTree#getDataTree() callsites
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index bd4d8ec58855619704be9ce5dc595fe7896f26cf..144f0f5c9fc825917d139ca01b3ff0e989c39ab1 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
@@ -10,8 +18,8 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
@@ -125,29 +133,6 @@ public class ShardTest extends AbstractShardTest {
 
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
 
-    final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-    protected Props newShardPropsWithRecoveryComplete() {
-
-        final Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<String,String>emptyMap(),
-                        newDatastoreContext(), SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-        return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
-    }
-
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -163,7 +148,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -243,7 +228,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
             // Write initial data into the in-memory store.
@@ -256,7 +241,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send the RegisterChangeListener and wait for the reply.
             shard.tell(new RegisterChangeListener(path, dclActor,
-                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+                    AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterChangeListenerReply.class);
@@ -354,18 +339,18 @@ public class ShardTest extends AbstractShardTest {
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
-                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                    RegisterDataTreeChangeListenerReply.class);
+                RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
             shard.tell(new FindLeader(), getRef());
@@ -441,8 +426,8 @@ public class ShardTest extends AbstractShardTest {
                             newDatastoreContext(), SCHEMA_CONTEXT);
                 }
 
-                Map<String, String> getPeerAddresses() {
-                    return getRaftActorContext().getPeerAddresses();
+                String getPeerAddress(String id) {
+                    return getRaftActorContext().getPeerAddress(id);
                 }
 
                 @Override
@@ -463,15 +448,14 @@ public class ShardTest extends AbstractShardTest {
                         }
                     })), "testPeerAddressResolved");
 
-            //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
-                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+                Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
-            assertEquals("getPeerAddresses", address,
-                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+            assertEquals("getPeerAddress", address,
+                ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -479,9 +463,14 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testApplySnapshot() throws Exception {
+
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
+        testkit.waitUntilLeader(shard);
+
         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
         store.setSchemaContext(SCHEMA_CONTEXT);
 
@@ -510,8 +499,12 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyState() throws Exception {
 
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
+        testkit.waitUntilLeader(shard);
+
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@@ -528,9 +521,11 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyStateWithCandidatePayload() throws Exception {
 
-        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+        ShardTestKit testkit = new ShardTestKit(getSystem());
 
-        recoveryComplete.await(5,  TimeUnit.SECONDS);
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+        testkit.waitUntilLeader(shard);
 
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
@@ -555,8 +550,8 @@ public class ShardTest extends AbstractShardTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                SerializationUtils.serializeNormalizedNode(root),
-                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+            SerializationUtils.serializeNormalizedNode(root),
+            Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
 
@@ -598,7 +593,7 @@ public class ShardTest extends AbstractShardTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
-                new ApplyJournalEntries(nListEntries));
+            new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
@@ -614,8 +609,8 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+            new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         final int nListEntries = 16;
         final Set<Integer> listEntryKeys = new HashSet<>();
@@ -940,12 +935,12 @@ public class ShardTest extends AbstractShardTest {
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -988,6 +983,44 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testBatchedModificationsWithOperationFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsWithOperationFailure");
+
+            waitUntilLeader(shard);
+
+            // Test merge with invalid data. An exception should occur when the merge is applied. Note that
+            // write will not validate the children for performance reasons.
+
+            String transactionID = "tx1";
+
+            ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+            BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+            batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
+            shard.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            Throwable cause = failure.cause();
+
+            batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            shard.tell(batched, getRef());
+
+            failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            assertEquals("Failure cause", cause, failure.cause());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @SuppressWarnings("unchecked")
     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
@@ -1149,7 +1182,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -1182,7 +1215,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -1234,7 +1267,7 @@ public class ShardTest extends AbstractShardTest {
             final MutableCompositeModification modification = new MutableCompositeModification();
             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
+                TestModel.TEST_PATH, containerNode, modification);
 
             final FiniteDuration duration = duration("5 seconds");
 
@@ -1242,7 +1275,7 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1525,7 +1558,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
@@ -1647,7 +1680,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1792,12 +1825,12 @@ public class ShardTest extends AbstractShardTest {
                     modification, preCommit);
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
@@ -1978,7 +2011,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+                cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -1996,7 +2029,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+                cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2052,7 +2085,7 @@ public class ShardTest extends AbstractShardTest {
             // Ready the third Tx.
 
             final String transactionID3 = "tx3";
-            final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification3 = dataStore.newModification();
             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
                     .apply(modification3);
                 modification3.ready();
@@ -2092,11 +2125,11 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortTransaction() throws Throwable {
+    public void testAbortCurrentTransaction() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortTransaction");
+                    "testAbortCurrentTransaction");
 
             waitUntilLeader(shard);
 
@@ -2158,6 +2191,78 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testAbortQueuedTransaction() throws Throwable {
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+        new ShardTestKit(getSystem()) {{
+            final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+            @SuppressWarnings("serial")
+            final Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                            dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            super.onReceiveCommand(message);
+                            if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                                if(cleaupCheckLatch.get() != null) {
+                                    cleaupCheckLatch.get().countDown();
+                                }
+                            }
+                        }
+                    };
+                }
+            };
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx1";
+
+            final MutableCompositeModification modification = new MutableCompositeModification();
+            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+            doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            // Ready the tx.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Send the AbortTransaction message.
+
+            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            verify(cohort).abort();
+
+            // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+            cleaupCheckLatch.set(new CountDownLatch(1));
+            assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+                    cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+            assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Now send CanCommitTransaction - should fail.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+            Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+            assertTrue("Failure type", failure instanceof IllegalStateException);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCreateSnapshot() throws Exception {
         testCreateSnapshot(true, "testCreateSnapshot");
@@ -2306,7 +2411,7 @@ public class ShardTest extends AbstractShardTest {
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                nonPersistentContext, SCHEMA_CONTEXT);
+            nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
@@ -2342,12 +2447,12 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -2371,11 +2476,11 @@ public class ShardTest extends AbstractShardTest {
                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
-                        ShardLeaderStateChanged.class);
+                    ShardLeaderStateChanged.class);
                 assertEquals("getLocalShardDataTree present", true,
                         leaderStateChanged.getLocalShardDataTree().isPresent());
                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
-                        leaderStateChanged.getLocalShardDataTree().get());
+                    leaderStateChanged.getLocalShardDataTree().get());
 
                 MessageCollectorActor.clearMessages(listener);
 
@@ -2415,4 +2520,144 @@ public class ShardTest extends AbstractShardTest {
         store.validate(modification);
         store.commit(store.prepare(modification));
     }
+
+    @Test
+    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+            final Creator<Shard> creator = new Creator<Shard>() {
+                boolean firstElectionTimeout = true;
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
+                                firstElectionTimeout = false;
+                                final ActorRef self = getSelf();
+                                new Thread() {
+                                    @Override
+                                    public void run() {
+                                        Uninterruptibles.awaitUninterruptibly(
+                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                        self.tell(message, self);
+                                    }
+                                }.start();
+
+                                onFirstElectionTimeout.countDown();
+                            } else {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+
+            assertEquals("Got first ElectionTimeout", true,
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeadeReply =
+                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            onChangeListenerRegistered.countDown();
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testClusteredDataChangeListernerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+                .shardName("inventory").type("config").build();
+
+            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+                .shardName("inventory").type("config").build();
+            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+                        "akka://test/user/" + member2ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+
+                            if(!(message instanceof ElectionTimeout)) {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+                        "akka://test/user/" + member1ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+                }
+            };
+
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(followerShardCreator)),
+                member1ShardID.toString());
+
+            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                member2ShardID.toString());
+            // Sleep to let election happen
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeaderReply =
+                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 }