import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
+import akka.serialization.Serialization;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
private static int ID_COUNTER = 1;
private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
- private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
+ private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
@Mock
private static CountDownLatch ready;
@Test
public void testOnRecoveryJournalIsCleaned() {
- InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ String persistenceID = "shard-manager-" + shardMrgIDSuffix;
+ InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
ImmutableSet.of("foo")));
- InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
ImmutableSet.of("bar")));
- InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
- new JavaTestKit(getSystem()) {{
- TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestShardManager shardManager = newTestShardManager();
- shardManager.underlyingActor().waitForRecoveryComplete();
- InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+ InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
- // Journal entries up to the last one should've been deleted
- Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
- synchronized (journal) {
- assertEquals("Journal size", 0, journal.size());
- }
- }};
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
+ synchronized (journal) {
+ assertEquals("Journal size", 0, journal.size());
+ }
}
@Test
}
@Test
- public void testOnReceiveCreateShard() {
+ public void testOnCreateShard() {
new JavaTestKit(getSystem()) {{
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
"foo", null, Arrays.asList("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
- expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+ expectMsgClass(duration("5 seconds"), Success.class);
shardManager.tell(new FindLocalShard("foo", true), getRef());
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
- // Send CreateShard with same name - should fail.
+ // Send CreateShard with same name - should return Success with a message.
+
+ shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+
+ Success success = expectMsgClass(duration("5 seconds"), Success.class);
+ assertNotNull("Success status is null", success.status());
+ }};
+ }
+
+ @Test
+ public void testOnCreateShardWithLocalMemberNotInShardConfig() {
+ new JavaTestKit(getSystem()) {{
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+ Shard.Builder shardBuilder = Shard.builder();
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+ expectMsgClass(duration("5 seconds"), Success.class);
- expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
+ assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
+ shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
}};
}
@Test
- public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
+ public void testOnCreateShardWithNoInitialSchemaContext() {
new JavaTestKit(getSystem()) {{
ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
"foo", null, Arrays.asList("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
- expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+ expectMsgClass(duration("5 seconds"), Success.class);
SchemaContext schemaContext = TestModel.createTestContext();
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
}
@Test
- public void testAddShardReplicaForNonExistentShard() throws Exception {
+ public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
}};
}
- @Test
- public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- shardManager.tell(new AddShardReplica("default"), getRef());
- Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true,
- (resp.cause() instanceof IllegalArgumentException));
- }};
- }
-
@Test
public void testAddShardReplica() throws Exception {
MockConfiguration mockConfig =
}
@Test
- public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
- MockConfiguration mockConfig =
- new MockConfiguration(ImmutableMap.<String, List<String>>builder().
- put("default", Arrays.asList("member-1", "member-2")).
- put("astronauts", Arrays.asList("member-2")).build());
+ public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor(), shardMgrID);
- String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- // Create an ActorSystem ShardManager actor for member-1.
- final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
- final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
- newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
- new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
+ String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
+ AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
+ ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
+ Props.create(MockRespondActor.class, addServerReply), leaderId);
- new JavaTestKit(system1) {{
+ MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
+
+ String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(newReplicaId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+
+ MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
+
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ // Send message again to verify previous in progress state is cleared
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
+
+ shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
+ leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ expectMsgClass(duration("5 seconds"), Failure.class);
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor,
+ new MockClusterWrapper(), mockConfig), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
+ terminateWatcher.watch(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
+ assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
+ addServerMsg.getNewServerId());
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
+
+ Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
+
+ shardManager.tell(new FindLocalShard("astronauts", false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ terminateWatcher.expectTerminated(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
+ failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+ JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor,
+ new MockClusterWrapper(), mockConfig), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+
+ shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
+
+ secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(
+ "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig));
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
- newReplicaShardManager.underlyingActor().waitForMemberUp();
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof RuntimeException));
}};
-
- JavaTestKit.shutdownActorSystem(system1);
}
@Test
put("people", Arrays.asList("member-1", "member-2")).build());
String[] restoredShards = {"default", "astronauts"};
ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
- InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
+ InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
//create shardManager to come up with restored data
TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
}
}
+ interface MessageInterceptor extends Function<Object, Object> {
+ boolean canIntercept(Object message);
+ }
+
+ private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
+ return new MessageInterceptor(){
+ @Override
+ public Object apply(Object message) {
+ return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
+ }
+
+ @Override
+ public boolean canIntercept(Object message) {
+ return message instanceof FindPrimary;
+ }
+ };
+ }
+
private static class ForwardingShardManager extends ShardManager {
private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
private CountDownLatch memberUpReceived = new CountDownLatch(1);
private final String name;
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
private ShardManagerSnapshot snapshot;
+ private volatile MessageInterceptor messageInterceptor;
public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
super(builder);
this.name = name;
}
+ void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+ this.messageInterceptor = messageInterceptor;
+ }
+
+
@Override
public void handleCommand(Object message) throws Exception {
try{
- super.handleCommand(message);
+ if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
+ getSender().tell(messageInterceptor.apply(message), getSelf());
+ } else {
+ super.handleCommand(message);
+ }
} finally {
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
}
private static class MockRespondActor extends MessageCollectorActor {
+ static final String CLEAR_RESPONSE = "clear-response";
private volatile Object responseMsg;
+ @SuppressWarnings("unused")
+ public MockRespondActor() {
+ }
+
+ @SuppressWarnings("unused")
+ public MockRespondActor(Object responseMsg) {
+ this.responseMsg = responseMsg;
+ }
+
public void updateResponse(Object response) {
responseMsg = response;
}
if (message instanceof AddServer) {
if (responseMsg != null) {
getSender().tell(responseMsg, getSelf());
- responseMsg = null;
}
+ } if(message.equals(CLEAR_RESPONSE)) {
+ responseMsg = null;
}
}
}