+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- protected Props newShardPropsWithRecoveryComplete() {
-
- final Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- };
- }
- };
- return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
- }
-
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
// Write initial data into the in-memory store.
// Now send the RegisterChangeListener and wait for the reply.
shard.tell(new RegisterChangeListener(path, dclActor,
- AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+ AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
final YangInstanceIdentifier path = TestModel.TEST_PATH;
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(new FindLeader(), getRef());
newDatastoreContext(), SCHEMA_CONTEXT);
}
- Map<String, String> getPeerAddresses() {
- return getRaftActorContext().getPeerAddresses();
+ String getPeerAddress(String id) {
+ return getRaftActorContext().getPeerAddress(id);
}
@Override
}
})), "testPeerAddressResolved");
- //waitUntilLeader(shard);
assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
- assertEquals("getPeerAddresses", address,
- ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+ assertEquals("getPeerAddress", address,
+ ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
@Test
public void testApplySnapshot() throws Exception {
+
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
+ testkit.waitUntilLeader(shard);
+
final DataTree store = InMemoryDataTreeFactory.getInstance().create();
store.setSchemaContext(SCHEMA_CONTEXT);
@Test
public void testApplyState() throws Exception {
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ testkit.waitUntilLeader(shard);
+
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@Test
public void testApplyStateWithCandidatePayload() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+ ShardTestKit testkit = new ShardTestKit(getSystem());
- recoveryComplete.await(5, TimeUnit.SECONDS);
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+ testkit.waitUntilLeader(shard);
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
return testStore;
}
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
+ new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
}
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
}};
}
+ @Test
+ public void testBatchedModificationsWithOperationFailure() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsWithOperationFailure");
+
+ waitUntilLeader(shard);
+
+ // Test merge with invalid data. An exception should occur when the merge is applied. Note that
+ // write will not validate the children for performance reasons.
+
+ String transactionID = "tx1";
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+ batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
+ shard.tell(batched, getRef());
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ Throwable cause = failure.cause();
+
+ batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ shard.tell(batched, getRef());
+
+ failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ assertEquals("Failure cause", cause, failure.cause());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@SuppressWarnings("unchecked")
private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification = dataStore.newModification();
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification = dataStore.newModification();
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ TestModel.TEST_PATH, containerNode, modification);
final FiniteDuration duration = duration("5 seconds");
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
modification, preCommit);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID2 = "tx2";
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
// Ready the third Tx.
final String transactionID3 = "tx3";
- final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification3 = dataStore.newModification();
new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
.apply(modification3);
modification3.ready();
}
@Test
- public void testAbortTransaction() throws Throwable {
+ public void testAbortCurrentTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortTransaction");
+ "testAbortCurrentTransaction");
waitUntilLeader(shard);
}};
}
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {{
+ final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+ @SuppressWarnings("serial")
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ super.onReceiveCommand(message);
+ if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(cleaupCheckLatch.get() != null) {
+ cleaupCheckLatch.get().countDown();
+ }
+ }
+ }
+ };
+ }
+ };
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+ doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready the tx.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ verify(cohort).abort();
+
+ // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+ cleaupCheckLatch.set(new CountDownLatch(1));
+ assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+ cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+ Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCreateSnapshot() throws Exception {
testCreateSnapshot(true, "testCreateSnapshot");
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- nonPersistentContext, SCHEMA_CONTEXT);
+ nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
- ShardLeaderStateChanged.class);
+ ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", true,
leaderStateChanged.getLocalShardDataTree().isPresent());
assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
- leaderStateChanged.getLocalShardDataTree().get());
+ leaderStateChanged.getLocalShardDataTree().get());
MessageCollectorActor.clearMessages(listener);
store.validate(modification);
store.commit(store.prepare(modification));
}
+
+ @Test
+ public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ final Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ onChangeListenerRegistered.countDown();
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testClusteredDataChangeListernerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+ .shardName("inventory").type("config").build();
+ final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+ "akka://test/user/" + member2ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+
+ if(!(message instanceof ElectionTimeout)) {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+ "akka://test/user/" + member1ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+ }
+ };
+
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(followerShardCreator)),
+ member1ShardID.toString());
+
+ final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ member2ShardID.toString());
+ // Sleep to let election happen
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeaderReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
}