import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
+ "InMemorySnapshotStore and journal recovery seq number will start from 1";
- @Test
- public void testRegisterChangeListener() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListener");
-
- waitUntilLeader(shard);
-
- shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
- TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
-
- shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
- AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- final String replyPath = reply.getListenerRegistrationPath().toString();
- assertTrue("Incorrect reply path: " + replyPath,
- replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents(path);
- }
- };
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
- // This test tests the timing window in which a change listener is registered before the
- // shard becomes the leader. We verify that the listener is registered and notified of the
- // existing data when the shard becomes the leader.
- // For this test, we want to send the RegisterChangeListener message after the shard
- // has recovered from persistence and before it becomes the leader. So we subclass
- // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
- // we know that the shard has been initialized to a follower and has started the
- // election process. The following 2 CountDownLatches are used to coordinate the
- // ElectionTimeout with the sending of the RegisterChangeListener message.
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- // Use a non persistent provider because this test actually invokes persist on the journal
- // this will cause all other messages to not be queued properly after that.
- // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
- // it does do a persist)
- return new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- if (message instanceof ElectionTimeout && firstElectionTimeout) {
- // Got the first ElectionTimeout. We don't forward it to the
- // base Shard yet until we've sent the RegisterChangeListener
- // message. So we signal the onFirstElectionTimeout latch to tell
- // the main thread to send the RegisterChangeListener message and
- // start a thread to wait on the onChangeListenerRegistered latch,
- // which the main thread signals after it has sent the message.
- // After the onChangeListenerRegistered is triggered, we send the
- // original ElectionTimeout message to proceed with the election.
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread(() -> {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }).start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.handleCommand(message);
- }
- }
- };
- }
- };
-
- setupInMemorySnapshotStore();
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListenerWhenNotLeaderInitially");
-
- new ShardTestKit(getSystem()) {
- {
- // Wait until the shard receives the first ElectionTimeout
- // message.
- assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- // Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
- getRef());
-
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- // Sanity check - verify the shard is not the leader yet.
- shard.tell(FindLeader.INSTANCE, getRef());
- final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
-
- // Signal the onChangeListenerRegistered latch to tell the
- // thread above to proceed
- // with the election process.
- onChangeListenerRegistered.countDown();
-
- // Wait for the shard to become the leader and notify our
- // listener with the existing
- // data in the store.
- listener.waitForChangeEvents(path);
- }
- };
- }
-
@Test
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {
boolean firstElectionTimeout = true;
@Override
- public Shard create() throws Exception {
+ public Shard create() {
return new Shard(newShardBuilder()) {
@Override
public void handleCommand(final Object message) {
}
@Test
- public void testPeerAddressResolved() throws Exception {
+ public void testPeerAddressResolved() {
new ShardTestKit(getSystem()) {
{
final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
}
}
- void onSuccess(final Object resp) throws Exception {
+ void onSuccess(final Object resp) {
}
}
}
@Override
- void onSuccess(final Object resp) throws Exception {
+ void onSuccess(final Object resp) {
final CanCommitTransactionReply canCommitReply =
CanCommitTransactionReply.fromSerializable(resp);
assertEquals("Can commit", true, canCommitReply.getCanCommit());
}
@Test
- public void testBatchedModificationsWithNoCommitOnReady() throws Exception {
+ public void testBatchedModificationsWithNoCommitOnReady() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testBatchedModificationsWithCommitOnReady() throws Exception {
+ public void testBatchedModificationsWithCommitOnReady() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier transactionID = nextTransactionId();
final BatchedModifications batched = new BatchedModifications(transactionID,
DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
}
@Test
- public void testBatchedModificationsWithOperationFailure() throws Exception {
+ public void testBatchedModificationsWithOperationFailure() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final Throwable cause = failure.cause();
batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
}
@Test
- public void testBatchedModificationsOnTransactionChain() throws Exception {
+ public void testBatchedModificationsOnTransactionChain() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
private static final long serialVersionUID = 1L;
@Override
- public Shard create() throws Exception {
+ public Shard create() {
return new Shard(newShardBuilder()) {
@Override
protected boolean isLeader() {
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+ shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
+ getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
}
}
@Test
- public void testReadyWithReadWriteImmediateCommit() throws Exception {
+ public void testReadyWithReadWriteImmediateCommit() {
testReadyWithImmediateCommit(true);
}
@Test
- public void testReadyWithWriteOnlyImmediateCommit() throws Exception {
+ public void testReadyWithWriteOnlyImmediateCommit() {
testReadyWithImmediateCommit(false);
}
- private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception {
+ private void testReadyWithImmediateCommit(final boolean readWrite) {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testReadyLocalTransactionWithImmediateCommit() throws Exception {
+ public void testReadyLocalTransactionWithImmediateCommit() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, true, Optional.empty());
shard.tell(readyMessage, getRef());
}
@Test
- public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception {
+ public void testReadyLocalTransactionWithThreePhaseCommit() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, false, Optional.empty());
shard.tell(readyMessage, getRef());
}
@Test
- public void testReadWriteCommitWithPersistenceDisabled() throws Exception {
+ public void testReadWriteCommitWithPersistenceDisabled() {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testAbortWithCommitPending() throws Exception {
+ public void testAbortWithCommitPending() {
new ShardTestKit(getSystem()) {
{
final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
// }
@Test
- public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
+ public void testTransactionCommitWithPriorExpiredCohortEntries() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
.apply(modification3);
modification3.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
- true);
+ true, Optional.empty());
shard.tell(readyMessage, getRef());
// Commit the first Tx. After completing, the second should
}
@Test
- public void testCanCommitBeforeReadyFailure() throws Exception {
+ public void testCanCommitBeforeReadyFailure() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testAbortAfterReady() throws Exception {
+ public void testAbortAfterReady() {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {
{
}
@Test
- public void testAbortQueuedTransaction() throws Exception {
+ public void testAbortQueuedTransaction() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
- throws InterruptedException, IOException {
+ throws InterruptedException {
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
savedSnapshot.set(null);
}
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
- throws IOException {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
.getRootNode().get();
assertEquals("Root node", expectedRoot, actual);
* This test simply verifies that the applySnapShot logic will work.
*/
@Test
- public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
+ public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
SCHEMA_CONTEXT);
}
@Test
- public void testRegisterRoleChangeListener() throws Exception {
+ public void testRegisterRoleChangeListener() {
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
}
@Test
- public void testFollowerInitialSyncStatus() throws Exception {
+ public void testFollowerInitialSyncStatus() {
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
}
- @Test
- public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
- dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- setupInMemorySnapshotStore();
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- actorFactory.generateActorId(testName + "-shard"));
-
- waitUntilNoLeader(shard);
-
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
- .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
-
- listener.waitForChangeEvents();
- }
- };
- }
-
- @Test
- public void testClusteredDataChangeListenerRegistration() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
- final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
- final TestActorRef<Shard> followerShard = actorFactory
- .createTestActor(Shard.builder().id(followerShardID)
- .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
- .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
- "akka://test/user/" + leaderShardID.toString()))
- .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
- final TestActorRef<Shard> leaderShard = actorFactory
- .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
- .peerAddresses(Collections.singletonMap(followerShardID.toString(),
- "akka://test/user/" + followerShardID.toString()))
- .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
- leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- final String leaderPath = waitUntilLeader(followerShard);
- assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- followerShard.tell(
- new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents();
- }
- };
- }
-
@Test
public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
}
@Test
- public void testServerRemoved() throws Exception {
+ public void testServerRemoved() {
final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()));