import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
- private final String id;
+ private final String persistenceId;
/**
*/
this.primaryShardInfoCache = builder.primaryShardInfoCache;
this.restoreFromSnapshot = builder.restoreFromSnapshot;
- id = "shard-manager-" + type;
+ String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+ persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
onCreateShard((CreateShard)message);
} else if(message instanceof AddShardReplica){
onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof ForwardedAddServerReply) {
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerFailure) {
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+ onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+ ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+ addShard(msg.shardName, msg.primaryFound, getSender());
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
+ } else if(message instanceof ServerRemoved){
+ onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
} else if (message instanceof SaveSnapshotFailure) {
LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure)message).cause());
} else {
unknownMessage(message);
}
+ }
+ private void onShardReplicaRemoved(ServerRemoved message) {
+ final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+ final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ if(shardInformation == null) {
+ LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ return;
+ } else if(shardInformation.getActor() != null) {
+ LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ }
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+ persistShardList();
}
private void onGetSnapshot() {
@Override
public String persistenceId() {
- return id;
+ return persistenceId;
}
@VisibleForTesting
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+ LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
} else {
if(response instanceof RemotePrimaryShardFound) {
- RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
- addShard (shardName, message, sender);
+ self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+ (RemotePrimaryShardFound)response), sender);
} else if(response instanceof LocalPrimaryShardFound) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
- shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
- response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
+ String msg = String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName);
+ self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
- AddServerReply reply = (AddServerReply)addServerResponse;
- onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
+ self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+ response.getPrimaryPath(), removeShardOnFailure), sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
if(removeShardOnFailure) {
ShardInformation shardInfo = localShards.remove(shardName);
if (shardInfo.getActor() != null) {
private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
String leaderPath, boolean removeShardOnFailure) {
String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
}
}
+ private static class ForwardedAddServerPrimaryShardFound {
+ String shardName;
+ RemotePrimaryShardFound primaryFound;
+
+ ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+ this.shardName = shardName;
+ this.primaryFound = primaryFound;
+ }
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
return shardName;
}
+ @Nullable
ActorRef getActor(){
return actor;
}