import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.actor.Status.Success;
+import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
- assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
}};
}
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
- assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
}};
}
}
+ @Test
+ public void testServerRemovedShardActorNotRunning() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).
+ put("people", Arrays.asList("member-1", "member-2")).build());
+
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ newShardMgrProps(mockConfig));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ shardManager.tell(new FindLocalShard("people", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new FindLocalShard("default", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ // Removed the default shard replica from member-1
+ ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+ final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+ shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+ shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+ }};
+ }
+
+ @Test
+ public void testServerRemovedShardActorRunning() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).
+ put("people", Arrays.asList("member-1", "member-2")).build());
+
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ newShardMgrProps(mockConfig));
+
+ TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+ watch(shard);
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ shardManager.underlyingActor().addShardActor("default", shard);
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindLocalShard("people", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new FindLocalShard("default", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ // Removed the default shard replica from member-1
+ ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+ final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+ shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+ shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+
+ expectMsgClass(duration("5 seconds"), Terminated.class);
+ }};
+ }
+
+
@Test
public void testShardPersistenceWithRestoredData() throws Exception {
new JavaTestKit(getSystem()) {{
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+ private ShardManagerSnapshot snapshot;
+ private Map<String, ActorRef> shardActors = new HashMap<>();
private TestShardManager(Builder builder) {
super(builder);
return Props.create(TestShardManager.class, this);
}
}
+
+ @Override
+ public void saveSnapshot(Object obj) {
+ snapshot = (ShardManagerSnapshot) obj;
+ snapshotPersist.countDown();
+ }
+
+ void verifySnapshotPersisted(Set<String> shardList) {
+ assertEquals("saveSnapshot invoked", true,
+ Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+ assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ if(shardActors.get(info.getShardName()) != null){
+ return shardActors.get(info.getShardName());
+ }
+ return super.newShardActor(schemaContext, info);
+ }
+
+ public void addShardActor(String shardName, ActorRef actorRef){
+ shardActors.put(shardName, actorRef);
+ }
}
private static class DelegatingShardManagerCreator implements Creator<ShardManager> {