package org.opendaylight.controller.cluster.datastore.shardmanager;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.URI;
+import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
- assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
assertEquals("getShardElectionTimeoutFactor", 6,
shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
assertEquals("getShardElectionTimeoutFactor", 7,
shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
- RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
DataTree mockDataTree = mock(DataTree.class);
shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
// RoleChangeNotification.
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
shardManager.tell(new ActorInitialized(), mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(
new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
DataTree mockDataTree = mock(DataTree.class);
shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
- LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
- kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(200));
LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
shardManager.tell(new ActorInitialized(), mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(200));
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
}
shardManager1.underlyingActor().waitForMemberUp();
shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
//
// shardManager1.tell(new FindPrimary("astronauts", false), getRef());
//
-// expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+// expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
}
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
kit.getRef());
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path1 = found1.getPrimaryPath();
assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
shardManager1.tell(
MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
- RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path2 = found2.getPrimaryPath();
assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
assertNull("Expected primaryShardInfoCache entry removed",
primaryShardInfoCache.getIfPresent("default"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
String path1 = found1.getPrimaryPath();
assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
- LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
- LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path + " which must on member-256",
path.contains("member-256-shard-default-config"));
// Make sure leader shard on member-256 is still leader and still in the cache.
shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
- found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
+ found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path + " which must still not on member-256",
path.contains("member-256-shard-default-config"));
shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
- LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
assertEquals("getShardName", "non-existent", notFound.getShardName());
}
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertTrue("Found path contains " + found.getPath().path().toString(),
found.getPath().path().toString().contains("member-1-shard-default-config"));
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
}
@Test
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(
+ shardManager.handleCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+ shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
verify(ready, times(1)).countDown();
}
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
+ shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
RaftState.Leader.name()));
verify(ready, never()).countDown();
public void testByDefaultSyncStatusIsFalse() {
TestShardManager shardManager = newTestShardManager();
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
RaftState.Follower.name(), RaftState.Leader.name()));
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
}
@Test
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Follower.name(), RaftState.Candidate.name()));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(
true, shardId));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Candidate.name(), RaftState.Follower.name()));
// Initially will be false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Send status true will make sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
// Send status false will make sync status false
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
}));
// Initially will be false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make default shard leader
String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+ shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// default = Leader, astronauts is unknown so sync status remains false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make astronauts shard leader as well
String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// Now sync status should be true
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
// Make astronauts a Follower
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Leader.name(), RaftState.Follower.name()));
// Sync status is not true
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make the astronauts follower sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
// Sync status is now true
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
}
"foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
- assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
+ assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
.getPeerAddressResolver() instanceof ShardPeerAddressResolver);
assertEquals("peerMembers", Sets.newHashSet(
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
assertNotNull("Success status is null", success.status());
LOG.info("testOnCreateShard ending");
"foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
"foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
.withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
+ assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
}
@Test
AddServer.class);
String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
- Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
// Send message again to verify previous in progress state is
// cleared
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
// Send message again with an AddServer timeout to verify the
datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
}
mockShardActor);
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
}
addServerMsg.getNewServerId());
mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
- Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
terminateWatcher.expectTerminated(mockNewReplicaShardActor);
shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
mockShardLeaderKit.expectMsgClass(AddServer.class);
mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
- failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
+ assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
}
.withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
+ assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
}
@Test
RemoveServer.class);
assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
}
@Test
RemoveServer.class);
String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
}
@Test
shardManager.tell(secondServerChange, secondRequestKit.getRef());
- secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
+ secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
}
@Test
shardManager.underlyingActor().waitForRecoveryComplete();
shardManager.tell(new FindLocalShard("people", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
shardManager.tell(new FindLocalShard("default", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
- LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
assertEquals("for uninitialized shard", "people", notFound.getShardName());
// Verify a local shard is created for the restored shards,
// as the actor initialization
// message is not sent for them
newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
LOG.info("testShardPersistenceWithRestoredData ending");
}
.create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
Boolean.TRUE));
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
}
@Test
MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
- Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
- assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
+ assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRegisterForShardLeaderChanges() {
+ LOG.info("testRegisterForShardLeaderChanges starting");
+
+ final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ final TestKit kit = new TestKit(getSystem());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ final Consumer<String> mockCallback = mock(Consumer.class);
+ shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
+
+ final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
+ final Registration reg = (Registration) reply.status();
+
+ final DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ reg.close();
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ LOG.info("testRegisterForShardLeaderChanges ending");
}
public static class TestShardManager extends ShardManager {
}
void waitForRecoveryComplete() {
- assertEquals("Recovery complete", true,
+ assertTrue("Recovery complete",
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
}
public void waitForMemberUp() {
- assertEquals("MemberUp received", true,
+ assertTrue("MemberUp received",
Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
memberUpReceived = new CountDownLatch(1);
}
void waitForMemberRemoved() {
- assertEquals("MemberRemoved received", true,
+ assertTrue("MemberRemoved received",
Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
memberRemovedReceived = new CountDownLatch(1);
}
void waitForUnreachableMember() {
- assertEquals("UnreachableMember received", true,
- Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
- ));
+ assertTrue("UnreachableMember received",
+ Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
memberUnreachableReceived = new CountDownLatch(1);
}
void waitForReachableMember() {
- assertEquals("ReachableMember received", true,
+ assertTrue("ReachableMember received",
Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
memberReachableReceived = new CountDownLatch(1);
}
void verifyFindPrimary() {
- assertEquals("FindPrimary received", true,
+ assertTrue("FindPrimary received",
Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
findPrimaryMessageReceived = new CountDownLatch(1);
}
}
void verifySnapshotPersisted(final Set<String> shardList) {
- assertEquals("saveSnapshot invoked", true,
+ assertTrue("saveSnapshot invoked",
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}