import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
// this will cause all other messages to not be queued properly after that.
// The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
// it does do a persist)
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
// Write initial data into the in-memory store.
// Now send the RegisterChangeListener and wait for the reply.
shard.tell(new RegisterChangeListener(path, dclActor,
- AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+ AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
"testRegisterDataTreeChangeListener-DataTreeChangeListener");
- shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterDataTreeChangeListenerReply.class);
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ return new Shard(Shard.builder().id(shardID).datastoreContext(
+ dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
final YangInstanceIdentifier path = TestModel.TEST_PATH;
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
- shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(new FindLeader(), getRef());
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
TestShard() {
- super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
- newDatastoreContext(), SCHEMA_CONTEXT);
+ super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
+ schemaContext(SCHEMA_CONTEXT));
}
- Map<String, String> getPeerAddresses() {
- return getRaftActorContext().getPeerAddresses();
+ String getPeerAddress(String id) {
+ return getRaftActorContext().getPeerAddress(id);
}
@Override
}
})), "testPeerAddressResolved");
- //waitUntilLeader(shard);
assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
- assertEquals("getPeerAddresses", address,
- ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+ assertEquals("getPeerAddress", address,
+ ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
testkit.waitUntilLeader(shard);
- final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
}
DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
- final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
testStore.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
+ new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
}
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
protected boolean isLeader() {
return overrideLeaderCalls.get() ? false : super.isLeader();
}
@Test
- public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+ public void testTransactionMessagesWithNoLeader() {
+ new ShardTestKit(getSystem()) {{
+ dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+ shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionMessagesWithNoLeader");
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
+ Failure failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
+ DataStoreVersions.CURRENT_VERSION, true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testReadyWithImmediateCommit() throws Exception{
+ testReadyWithImmediateCommit(true);
+ testReadyWithImmediateCommit(false);
+ }
+
+ public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testForwardedReadyTransactionWithImmediateCommit");
+ "testReadyWithImmediateCommit-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification = dataStore.newModification();
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification = dataStore.newModification();
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@Test
public void testCommitWithPersistenceDisabled() throws Throwable {
+ testCommitWithPersistenceDisabled(true);
+ testCommitWithPersistenceDisabled(false);
+ }
+
+ public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWithPersistenceDisabled");
+ "testCommitWithPersistenceDisabled-" + readWrite);
waitUntilLeader(shard);
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ TestModel.TEST_PATH, containerNode, modification);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
}
@Test
- public void testCommitWhenTransactionHasNoModifications(){
+ public void testCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasNoModifications");
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
}
@Test
- public void testCommitWhenTransactionHasModifications(){
+ public void testCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(true);
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ public void testCommitWhenTransactionHasModifications(final boolean readWrite){
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasModifications");
+ "testCommitWhenTransactionHasModifications-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
@Test
public void testCommitPhaseFailure() throws Throwable {
+ testCommitPhaseFailure(true);
+ testCommitPhaseFailure(false);
+ }
+
+ public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
+ "testCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testPreCommitPhaseFailure() throws Throwable {
+ testPreCommitPhaseFailure(true);
+ testPreCommitPhaseFailure(false);
+ }
+
+ public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreCommitPhaseFailure");
+ "testPreCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
@Test
public void testCanCommitPhaseFailure() throws Throwable {
+ testCanCommitPhaseFailure(true);
+ testCanCommitPhaseFailure(false);
+ }
+
+ public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFailure");
+ "testCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
final String transactionID2 = "tx2";
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@Test
public void testCanCommitPhaseFalseResponse() throws Throwable {
+ testCanCommitPhaseFalseResponse(true);
+ testCanCommitPhaseFalseResponse(false);
+ }
+
+ public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFalseResponse");
+ "testCanCommitPhaseFalseResponse-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
final String transactionID2 = "tx2";
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@Test
public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFailure(true);
+ testImmediateCommitWithCanCommitPhaseFailure(false);
+ }
+
+ public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFailure");
+ "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
doReturn(candidateRoot).when(candidate).getRootNode();
doReturn(candidate).when(cohort).getCandidate();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
@Test
public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+ testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+ }
+
+ public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFalseResponse");
+ "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
doReturn(candidateRoot).when(candidate).getRootNode();
doReturn(candidate).when(cohort).getCandidate();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
@Test
public void testAbortBeforeFinishCommit() throws Throwable {
+ testAbortBeforeFinishCommit(true);
+ testAbortBeforeFinishCommit(false);
+ }
+
+ public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortBeforeFinishCommit");
+ "testAbortBeforeFinishCommit-" + readWrite);
waitUntilLeader(shard);
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
@Test
public void testTransactionCommitTimeout() throws Throwable {
+ testTransactionCommitTimeout(true);
+ testTransactionCommitTimeout(false);
+ }
+
+ public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitTimeout");
+ "testTransactionCommitTimeout-" + readWrite);
waitUntilLeader(shard);
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// The 3rd Tx should exceed queue capacity and fail.
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// canCommit 1st Tx.
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID2 = "tx2";
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID3 = "tx3";
final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// CanCommit the first one so it's the current in-progress CohortEntry.
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Ready the third Tx.
final String transactionID3 = "tx3";
- final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+ final DataTreeModification modification3 = dataStore.newModification();
new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
.apply(modification3);
modification3.ready();
@Test
public void testAbortCurrentTransaction() throws Throwable {
+ testAbortCurrentTransaction(true);
+ testAbortCurrentTransaction(false);
+ }
+
+ public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortCurrentTransaction");
+ "testAbortCurrentTransaction-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testAbortQueuedTransaction() throws Throwable {
+ testAbortQueuedTransaction(true);
+ testAbortQueuedTransaction(false);
+ }
+
+ public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
final Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
super.onReceiveCommand(message);
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
Props.create(new DelegatingShardCreator(creator)).withDispatcher(
- Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
waitUntilLeader(shard);
// Ready the tx.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
new ShardTestKit(getSystem()) {{
class TestShard extends Shard {
- protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ protected TestShard(AbstractBuilder<?, ?> builder) {
+ super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}
final Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new TestShard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT);
+ return new TestShard(newShardBuilder());
}
};
Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
// Trigger creation of a snapshot by ensuring
final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
-
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
- latch.set(new CountDownLatch(1));
- savedSnapshot.set(null);
+ awaitAndValidateSnapshot(expectedRoot);
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+ awaitAndValidateSnapshot(expectedRoot);
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+ ) throws InterruptedException {
+ System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+ assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+ latch.set(new CountDownLatch(1));
+ savedSnapshot.set(null);
+ }
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
- assertEquals("Root node", expectedRoot, actual);
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
- }};
+ final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
+
+ }
+ };
}
/**
*/
@Test
public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
- final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
final DataTreeModification putTransaction = store.takeSnapshot().newModification();
final DatastoreContext persistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
- final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- persistentContext, SCHEMA_CONTEXT);
+ final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
+ schemaContext(SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
- final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- nonPersistentContext, SCHEMA_CONTEXT);
+ final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
+ schemaContext(SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
- ShardLeaderStateChanged.class);
+ ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", true,
leaderStateChanged.getLocalShardDataTree().isPresent());
assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
- leaderStateChanged.getLocalShardDataTree().get());
+ leaderStateChanged.getLocalShardDataTree().get());
MessageCollectorActor.clearMessages(listener);
store.validate(modification);
store.commit(store.prepare(modification));
}
+
+ @Test
+ public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ listener.waitForChangeEvents();
+ }};
+ }
+
+ @Test
+ public void testClusteredDataChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+ followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }};
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ listener.waitForChangeEvents();
+ }};
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }};
+ }
+
+ @Test
+ public void testServerRemoved() throws Exception {
+ final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+ final ActorRef shard = parent.underlyingActor().context().actorOf(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testServerRemoved");
+
+ shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+ MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+ }
+
}