import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
- final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
+ final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
RegisterDataTreeNotificationListenerReply.class);
final String replyPath = reply.getListenerRegistrationPath().toString();
assertTrue("Incorrect reply path: " + replyPath,
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
final ShardTestKit testKit = new ShardTestKit(getSystem());
- assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+ assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
- final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+ final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(FindLeader.INSTANCE, testKit.getRef());
- final FindLeaderReply findLeadeReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
- FindLeaderReply.class);
+ final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
onChangeListenerRegistered.countDown();
shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
- final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
+ final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
- final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
+ final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
CreateTransactionReply.class);
final String path = reply.getTransactionPath().toString();
final CountDownLatch commitLatch = new CountDownLatch(2);
final long timeoutSec = 5;
- final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
- final Timeout timeout = new Timeout(duration);
+ final Duration duration = Duration.ofSeconds(timeoutSec);
+ final Timeout timeout = Timeout.create(duration);
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@Override
void onSuccess(final Object resp) {
- final CanCommitTransactionReply canCommitReply =
- CanCommitTransactionReply.fromSerializable(resp);
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
+ assertTrue("Can commit", canCommitReply.getCanCommit());
final Future<Object> commitFuture = Patterns.ask(shard,
new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Ready 2 more Tx's.
throw new RuntimeException(t);
}
- assertEquals("Commits complete", true, done);
+ assertTrue("Commits complete", done);
// final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
// cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
ShardTestKit.waitUntilLeader(shard);
final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
// Send a BatchedModifications to start a transaction.
shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CommitTransaction message.
ShardTestKit.waitUntilLeader(shard);
final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
// Send a BatchedModifications to start a transaction.
shard.tell(batched, testKit.getRef());
- final Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), Failure.class);
+ final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
if (failure != null) {
Throwables.propagateIfPossible(failure.cause(), Exception.class);
BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
shard.tell(batched, testKit.getRef());
- Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
+ Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
final Throwable cause = failure.cause();
shard.tell(batched, testKit.getRef());
- failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
+ failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
assertEquals("Failure cause", cause, failure.cause());
}
final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
// Send a BatchedModifications to start a chained write
// transaction and ready it.
shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
- final CreateTransactionReply createReply = testKit.expectMsgClass(testKit.duration("3 seconds"),
+ final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
CreateTransactionReply.class);
getSystem().actorSelection(createReply.getTransactionPath())
.tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
- final ReadDataReply readReply = testKit.expectMsgClass(testKit.duration("3 seconds"), ReadDataReply.class);
+ final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
assertEquals("Read node", containerNode, readReply.getNormalizedNode());
// Commit the write transaction.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
testKit.expectMsgClass(duration, CommitTransactionReply.class);
testKit.getRef());
}
- testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
+ testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
- final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
+ .build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
final TransactionIdentifier txId = nextTransactionId();
final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
- final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
+ .build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
final TransactionIdentifier txId = nextTransactionId();
shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CanCommitTransaction message.
// Setup a simulated transactions with a mock cohort.
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID = nextTransactionId();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CanCommitTransaction message.
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID = nextTransactionId();
if (readWrite) {
shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
testKit.expectMsgClass(duration, CommitTransactionReply.class);
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
+ final Duration duration = Duration.ofSeconds(5);
+ final Timeout timeout = Timeout.create(duration);
// Setup 2 simulated transactions with mock cohorts. The first
// one fails in the
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This
// should get queued and
}
}, getSystem().dispatcher());
- assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+ assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
final InOrder inOrder = inOrder(dataTree);
inOrder.verify(dataTree).validate(any(DataTreeModification.class));
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
+ final Duration duration = Duration.ofSeconds(5);
+ final Timeout timeout = Timeout.create(duration);
doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
.prepare(any(DataTreeModification.class));
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This
// should get queued and
}
}, getSystem().dispatcher());
- assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+ assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
final InOrder inOrder = inOrder(dataTree);
inOrder.verify(dataTree).validate(any(DataTreeModification.class));
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID1 = nextTransactionId();
doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
final CanCommitTransactionReply reply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("getCanCommit", true, reply.getCanCommit());
+ assertTrue("getCanCommit", reply.getCanCommit());
}
@Test
doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
.doNothing().when(dataTree).validate(any(DataTreeModification.class));
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID1 = nextTransactionId();
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID = nextTransactionId();
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final TransactionIdentifier transactionID1 = nextTransactionId();
shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
"testCanCommitBeforeReadyFailure");
shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
- testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
+ testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
}
@Test
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
+ final Duration duration = Duration.ofSeconds(5);
+ final Timeout timeout = Timeout.create(duration);
// Ready 2 transactions - the first one will be aborted.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
.fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ assertTrue("Can commit", canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This
// should get queued and
// Wait for the 2nd Tx to complete the canCommit phase.
- canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
+ FiniteDuration.create(5, TimeUnit.SECONDS));
+ assertTrue("Can commit", canCommitReply.getCanCommit());
}
@Test
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
// Ready a tx.
ShardTestKit.waitUntilLeader(shard);
- final FiniteDuration duration = testKit.duration("5 seconds");
+ final Duration duration = Duration.ofSeconds(5);
// Ready 3 tx's.
}
}
- final ShardTestKit testKit = new ShardTestKit(getSystem());
-
final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
final AtomicReference<Object> savedSnapshot, final NormalizedNode<?, ?> expectedRoot)
throws InterruptedException {
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
- assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable());
+ assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
ShardTestKit.waitUntilLeader(shard);
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
- assertEquals("isRecoveryApplicable", false, shard.underlyingActor().persistence().isRecoveryApplicable());
+ assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
- assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable());
+ assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
}
@Test
ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
ShardLeaderStateChanged.class);
- assertEquals("getLocalShardDataTree present", true,
- leaderStateChanged.getLocalShardDataTree().isPresent());
+ assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
leaderStateChanged.getLocalShardDataTree().get());
shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
- leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
- ShardLeaderStateChanged.class);
- assertEquals("getLocalShardDataTree present", false, leaderStateChanged.getLocalShardDataTree().isPresent());
+ leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
+ assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
}
@Test
shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
"member-1-shard-inventory-operational"));
- assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+ assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
"member-1-shard-inventory-operational"));
- assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+ assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
}
@Test
testKit.waitUntilNoLeader(shard);
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
- final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+ final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
testKit.waitUntilNoLeader(shard);
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
- final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+ final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
- final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+ final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
RegisterDataTreeNotificationListenerReply.class);
assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());