@Override
public void removePeer(String name) {
- peerInfoMap.remove(name);
+ if(getId().equals(name)) {
+ votingMember = false;
+ } else {
+ peerInfoMap.remove(name);
+ }
}
@Override public ActorSelection getPeerActorSelection(String peerId) {
private void onWrappedShardResponse(WrappedShardResponse message) {
if (message.getResponse() instanceof RemoveServerReply) {
- onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
+ onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+ message.getLeaderPath());
}
}
- private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
- shardReplicaOperationsInProgress.remove(shardName);
- originalSender.tell(new Status.Success(null), self());
+ private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+ String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId);
+
+ LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ shardId.getShardName());
+ originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ persistenceId(), shardId, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+ leaderPath, shardId);
+ originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ }
}
private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
} else {
// SUCCESS
- self().tell(new WrappedShardResponse(shardName, response), sender);
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
} else if(shardInformation.getActor() != null) {
- LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(new Shutdown(), self());
}
LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
persistShardList();
* The WrappedShardResponse class wraps a response from a Shard.
*/
private static class WrappedShardResponse {
- private final String shardName;
+ private final ShardIdentifier shardId;
private final Object response;
+ private final String leaderPath;
- private WrappedShardResponse(String shardName, Object response) {
- this.shardName = shardName;
+ private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+ this.shardId = shardId;
this.response = response;
+ this.leaderPath = leaderPath;
}
- String getShardName() {
- return shardName;
+ ShardIdentifier getShardId() {
+ return shardId;
}
Object getResponse() {
return response;
}
+
+ String getLeaderPath() {
+ return leaderPath;
+ }
}
}
import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.actor.Status.Success;
-import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
- watch(shard);
-
shardManager.underlyingActor().waitForRecoveryComplete();
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
- expectMsgClass(duration("5 seconds"), Terminated.class);
+ MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
}};
LOG.info("testServerRemovedShardActorRunning ending");
*/
package org.opendaylight.controller.cluster.datastore.admin;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
}
+ @Test
+ public void testRemoveShardLeaderReplica() throws Exception {
+ String name = "testRemoveShardLeaderReplica";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).
+ datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+ replicaNode2.waitForMembersUp("member-1", "member-3");
+ replicaNode2.waitForMembersUp("member-1", "member-2");
+
+ // Invoke RPC service on leader member-1 to remove it's local shard
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore());
+
+ RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
+ setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
+ get(10, TimeUnit.SECONDS);
+ verifySuccessfulRpcResult(rpcResult);
+ service1.close();
+
+ verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
+ containsString("member-3")));
+ }
+ });
+
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
+ verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
+ }
+
@Test
public void testAddReplicasForAllShards() throws Exception {
String name = "testAddReplicasForAllShards";