import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
+ "InMemorySnapshotStore and journal recovery seq number will start from 1";
- @Test
- public void testRegisterChangeListener() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListener");
-
- waitUntilLeader(shard);
-
- shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
- TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
-
- shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
- AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
- final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterChangeListenerReply.class);
- final String replyPath = reply.getListenerRegistrationPath().toString();
- assertTrue("Incorrect reply path: " + replyPath,
- replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents(path);
- }
- };
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
- // This test tests the timing window in which a change listener is registered before the
- // shard becomes the leader. We verify that the listener is registered and notified of the
- // existing data when the shard becomes the leader.
- // For this test, we want to send the RegisterChangeListener message after the shard
- // has recovered from persistence and before it becomes the leader. So we subclass
- // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
- // we know that the shard has been initialized to a follower and has started the
- // election process. The following 2 CountDownLatches are used to coordinate the
- // ElectionTimeout with the sending of the RegisterChangeListener message.
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- // Use a non persistent provider because this test actually invokes persist on the journal
- // this will cause all other messages to not be queued properly after that.
- // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
- // it does do a persist)
- return new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- if (message instanceof ElectionTimeout && firstElectionTimeout) {
- // Got the first ElectionTimeout. We don't forward it to the
- // base Shard yet until we've sent the RegisterChangeListener
- // message. So we signal the onFirstElectionTimeout latch to tell
- // the main thread to send the RegisterChangeListener message and
- // start a thread to wait on the onChangeListenerRegistered latch,
- // which the main thread signals after it has sent the message.
- // After the onChangeListenerRegistered is triggered, we send the
- // original ElectionTimeout message to proceed with the election.
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.handleCommand(message);
- }
- }
- };
- }
- };
-
- setupInMemorySnapshotStore();
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListenerWhenNotLeaderInitially");
-
- new ShardTestKit(getSystem()) {
- {
- // Wait until the shard receives the first ElectionTimeout
- // message.
- assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- // Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
- getRef());
-
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- // Sanity check - verify the shard is not the leader yet.
- shard.tell(FindLeader.INSTANCE, getRef());
- final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
-
- // Signal the onChangeListenerRegistered latch to tell the
- // thread above to proceed
- // with the election process.
- onChangeListenerRegistered.countDown();
-
- // Wait for the shard to become the leader and notify our
- // listener with the existing
- // data in the store.
- listener.waitForChangeEvents(path);
- }
- };
- }
-
@Test
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
final String replyPath = reply.getListenerRegistrationPath().toString();
assertTrue("Incorrect reply path: " + replyPath,
replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
boolean firstElectionTimeout = true;
@Override
- public Shard create() throws Exception {
+ public Shard create() {
return new Shard(newShardBuilder()) {
@Override
public void handleCommand(final Object message) {
if (message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(FindLeader.INSTANCE, getRef());
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
- assertTrue("Unexpected transaction path " + path, path
- .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
+ assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+ "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
+ shardID.getShardName(), shardID.getMemberName().getName())));
}
};
}
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
- assertTrue("Unexpected transaction path " + path, path.startsWith(
- "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
+ assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+ "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
+ shardID.getShardName(), shardID.getMemberName().getName())));
}
};
}
@Test
- public void testPeerAddressResolved() throws Exception {
+ public void testPeerAddressResolved() {
new ShardTestKit(getSystem()) {
{
final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
ShardTestKit.waitUntilLeader(shard);
- final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- store.setSchemaContext(SCHEMA_CONTEXT);
+ final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+ SCHEMA_CONTEXT);
final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
ShardTestKit.waitUntilLeader(shard);
- final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- store.setSchemaContext(SCHEMA_CONTEXT);
+ final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+ SCHEMA_CONTEXT);
final DataTreeModification writeMod = store.takeSnapshot().newModification();
final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
}
}
- void onSuccess(final Object resp) throws Exception {
+ void onSuccess(final Object resp) {
}
}
}
@Override
- void onSuccess(final Object resp) throws Exception {
+ void onSuccess(final Object resp) {
final CanCommitTransactionReply canCommitReply =
CanCommitTransactionReply.fromSerializable(resp);
assertEquals("Can commit", true, canCommitReply.getCanCommit());
final ReadyTransactionReply readyReply = ReadyTransactionReply
.fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
- Throwables.propagate(caughtEx.get());
+ final Throwable t = caughtEx.get();
+ if (t != null) {
+ Throwables.propagateIfPossible(t, Exception.class);
+ throw new RuntimeException(t);
}
assertEquals("Commits complete", true, done);
verifyOuterListEntry(shard, 1);
- verifyLastApplied(shard, 2);
+ verifyLastApplied(shard, 5);
}
};
}
@Test
- public void testBatchedModificationsWithNoCommitOnReady() throws Exception {
+ public void testBatchedModificationsWithNoCommitOnReady() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testBatchedModificationsWithCommitOnReady() throws Exception {
+ public void testBatchedModificationsWithCommitOnReady() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier transactionID = nextTransactionId();
final BatchedModifications batched = new BatchedModifications(transactionID,
DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
if (failure != null) {
- Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
- Throwables.propagate(failure.cause());
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());
}
}
};
}
@Test
- public void testBatchedModificationsWithOperationFailure() throws Exception {
+ public void testBatchedModificationsWithOperationFailure() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final Throwable cause = failure.cause();
batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
}
@Test
- public void testBatchedModificationsOnTransactionChain() throws Exception {
+ public void testBatchedModificationsOnTransactionChain() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
private static final long serialVersionUID = 1L;
@Override
- public Shard create() throws Exception {
+ public Shard create() {
return new Shard(newShardBuilder()) {
@Override
protected boolean isLeader() {
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+ shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
+ getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
}
}
@Test
- public void testReadyWithReadWriteImmediateCommit() throws Exception {
+ public void testReadyWithReadWriteImmediateCommit() {
testReadyWithImmediateCommit(true);
}
@Test
- public void testReadyWithWriteOnlyImmediateCommit() throws Exception {
+ public void testReadyWithWriteOnlyImmediateCommit() {
testReadyWithImmediateCommit(false);
}
- private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception {
+ private void testReadyWithImmediateCommit(final boolean readWrite) {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testReadyLocalTransactionWithImmediateCommit() throws Exception {
+ public void testReadyLocalTransactionWithImmediateCommit() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, true, Optional.empty());
shard.tell(readyMessage, getRef());
}
@Test
- public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception {
+ public void testReadyLocalTransactionWithThreePhaseCommit() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, false, Optional.empty());
shard.tell(readyMessage, getRef());
}
@Test
- public void testReadWriteCommitWithPersistenceDisabled() throws Exception {
+ public void testReadWriteCommitWithPersistenceDisabled() {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {
{
private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
new ShardTestKit(getSystem()) {
{
- final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final DataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWhenTransactionHasModifications-" + readWrite);
inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+ // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
+ // the journal
+ Thread.sleep(HEARTBEAT_MILLIS * 2);
+
shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
// Commit index should advance as we do not have an empty
// modification
- assertEquals(0, shardStats.getCommitIndex());
+ assertEquals(1, shardStats.getCommitIndex());
}
};
}
public void testCommitPhaseFailure() throws Exception {
new ShardTestKit(getSystem()) {
{
- final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final DataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitPhaseFailure");
public void testPreCommitPhaseFailure() throws Exception {
new ShardTestKit(getSystem()) {
{
- final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final DataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testPreCommitPhaseFailure");
public void testCanCommitPhaseFailure() throws Exception {
new ShardTestKit(getSystem()) {
{
- final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final DataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitPhaseFailure");
private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
new ShardTestKit(getSystem()) {
{
- final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final DataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
}
@Test
- public void testAbortWithCommitPending() throws Exception {
+ public void testAbortWithCommitPending() {
new ShardTestKit(getSystem()) {
{
final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
// }
@Test
- public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
+ public void testTransactionCommitWithPriorExpiredCohortEntries() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
.apply(modification3);
modification3.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
- true);
+ true, Optional.empty());
shard.tell(readyMessage, getRef());
// Commit the first Tx. After completing, the second should
}
@Test
- public void testCanCommitBeforeReadyFailure() throws Exception {
+ public void testCanCommitBeforeReadyFailure() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testAbortAfterReady() throws Exception {
+ public void testAbortAfterReady() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testAbortQueuedTransaction() throws Exception {
+ public void testAbortQueuedTransaction() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
- throws InterruptedException, IOException {
+ throws InterruptedException {
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
savedSnapshot.set(null);
}
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
- throws IOException {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
.getRootNode().get();
assertEquals("Root node", expectedRoot, actual);
* This test simply verifies that the applySnapShot logic will work.
*/
@Test
- public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
- final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- store.setSchemaContext(SCHEMA_CONTEXT);
+ public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
+ final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+ SCHEMA_CONTEXT);
final DataTreeModification putTransaction = store.takeSnapshot().newModification();
putTransaction.write(TestModel.TEST_PATH,
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testRegisterRoleChangeListener() throws Exception {
+ public void testRegisterRoleChangeListener() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
waitUntilLeader(shard);
- final TestActorRef<MessageCollectorActor> listener =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
shard.tell(new RegisterRoleChangeListener(), listener);
}
@Test
- public void testFollowerInitialSyncStatus() throws Exception {
+ public void testFollowerInitialSyncStatus() {
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
}
@Test
- public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
setupInMemorySnapshotStore();
waitUntilNoLeader(shard);
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
}
@Test
- public void testClusteredDataChangeListenerRegistration() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
- final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
- final TestActorRef<Shard> followerShard = actorFactory
- .createTestActor(Shard.builder().id(followerShardID)
- .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
- .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
- "akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
- final TestActorRef<Shard> leaderShard = actorFactory
- .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
- .peerAddresses(Collections.singletonMap(followerShardID.toString(),
- "akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
- leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- final String leaderPath = waitUntilLeader(followerShard);
- assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- followerShard.tell(
- new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
-
- writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents();
- }
- };
- }
-
- @Test
- public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
- final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
waitUntilNoLeader(shard);
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+ final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+ expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
.customRaftPolicyImplementation(null).build(), ActorRef.noSender());
- listener.waitForChangeEvents();
+ listener.expectNoMoreChanges("Received unexpected change after close");
}
};
}
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
- final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
- public void testServerRemoved() throws Exception {
- final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+ public void testServerRemoved() {
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
final ActorRef shard = parent.underlyingActor().context().actorOf(
newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),