import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
+
private ShardSnapshot restoreFromSnapshot;
+
+
protected Shard(AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+
}
private void setTransactionCommitTimeout() {
context().parent().tell(message, self());
} else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
sender().tell(getShardMBean(), self());
- } else if(message instanceof GetShardDataTree){
+ } else if(message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
+ } else if(message instanceof ServerRemoved){
+ context().parent().forward(message, context());
} else {
super.onReceiveCommand(message);
}
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ DataTreeCandidatePayload.create(candidate));
}
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
+ } else if(message instanceof ServerRemoved){
+ onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
} else if (message instanceof SaveSnapshotFailure) {
LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure)message).cause());
}
}
+ private void onShardReplicaRemoved(ServerRemoved message) {
+ final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+ final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ if(shardInformation == null) {
+ LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ return;
+ } else if(shardInformation.getActor() != null) {
+ LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ }
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+
private void onGetSnapshot() {
LOG.debug("{}: onGetSnapshot", persistenceId());
return shardName;
}
+ @Nullable
ActorRef getActor(){
return actor;
}
import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.actor.Status.Success;
+import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
- assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
}};
}
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
- assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
}};
}
}
+ @Test
+ public void testServerRemovedShardActorNotRunning() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).
+ put("people", Arrays.asList("member-1", "member-2")).build());
+
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ newShardMgrProps(mockConfig));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ shardManager.tell(new FindLocalShard("people", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new FindLocalShard("default", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ // Removed the default shard replica from member-1
+ ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+ final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+ shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+ shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+ }};
+ }
+
+ @Test
+ public void testServerRemovedShardActorRunning() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).
+ put("people", Arrays.asList("member-1", "member-2")).build());
+
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ newShardMgrProps(mockConfig));
+
+ TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+ watch(shard);
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ shardManager.underlyingActor().addShardActor("default", shard);
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindLocalShard("people", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new FindLocalShard("default", false), getRef());
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+ // Removed the default shard replica from member-1
+ ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+ final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+ shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+ shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+
+ expectMsgClass(duration("5 seconds"), Terminated.class);
+ }};
+ }
+
+
@Test
public void testShardPersistenceWithRestoredData() throws Exception {
new JavaTestKit(getSystem()) {{
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+ private ShardManagerSnapshot snapshot;
+ private Map<String, ActorRef> shardActors = new HashMap<>();
private TestShardManager(Builder builder) {
super(builder);
return Props.create(TestShardManager.class, this);
}
}
+
+ @Override
+ public void saveSnapshot(Object obj) {
+ snapshot = (ShardManagerSnapshot) obj;
+ snapshotPersist.countDown();
+ }
+
+ void verifySnapshotPersisted(Set<String> shardList) {
+ assertEquals("saveSnapshot invoked", true,
+ Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+ assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ if(shardActors.get(info.getShardName()) != null){
+ return shardActors.get(info.getShardName());
+ }
+ return super.newShardActor(schemaContext, info);
+ }
+
+ public void addShardActor(String shardName, ActorRef actorRef){
+ shardActors.put(shardName, actorRef);
+ }
}
private static class DelegatingShardManagerCreator implements Creator<ShardManager> {