import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private DatastoreSnapshot restoreFromSnapshot;
+ private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+ private final String persistenceId;
+
/**
*/
protected ShardManager(Builder builder) {
this.primaryShardInfoCache = builder.primaryShardInfoCache;
this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+ persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
+
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
// Subscribe this actor to cluster member events
onCreateShard((CreateShard)message);
} else if(message instanceof AddShardReplica){
onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof ForwardedAddServerReply) {
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerFailure) {
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+ onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+ ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+ addShard(msg.shardName, msg.primaryFound, getSender());
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
+ } else if(message instanceof ServerRemoved){
+ onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
} else if (message instanceof SaveSnapshotFailure) {
LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure)message).cause());
} else {
unknownMessage(message);
}
+ }
+ private void onShardReplicaRemoved(ServerRemoved message) {
+ final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+ final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ if(shardInformation == null) {
+ LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ return;
+ } else if(shardInformation.getActor() != null) {
+ LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+ }
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+ persistShardList();
}
private void onGetSnapshot() {
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- if(localShards.containsKey(moduleShardConfig.getShardName())) {
- throw new IllegalStateException(String.format("Shard with name %s already exists",
- moduleShardConfig.getShardName()));
+ String shardName = createShard.getModuleShardConfig().getShardName();
+ if(localShards.containsKey(shardName)) {
+ reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ } else {
+ doCreateShard(createShard);
+ reply = new akka.actor.Status.Success(null);
}
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
- configuration.addModuleShardConfiguration(moduleShardConfig);
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
- Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
- moduleShardConfig.getShardMemberNames()*/);
+ private void doCreateShard(CreateShard createShard) {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ String shardName = moduleShardConfig.getShardName();
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ configuration.addModuleShardConfiguration(moduleShardConfig);
- DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
- shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
- } else {
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
- }
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = newShardDatastoreContext(shardName);
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
- ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
- shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
- localShards.put(info.getShardName(), info);
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- mBean.addLocalShard(shardId.toString());
+ Map<String, String> peerAddresses;
+ boolean isActiveMember;
+ if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ peerAddresses = getPeerAddresses(shardName);
+ isActiveMember = true;
+ } else {
+ // The local member is not in the given shard member configuration. In this case we'll create
+ // the shard with no peers and with elections disabled so it stays as follower. A
+ // subsequent AddServer request will be needed to make it an active member.
+ isActiveMember = false;
+ peerAddresses = Collections.emptyMap();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ }
- if(schemaContext != null) {
- info.setActor(newShardActor(schemaContext, info));
- }
+ LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
- reply = new CreateShardReply();
- } catch (Exception e) {
- LOG.error("onCreateShard failed", e);
- reply = new akka.actor.Status.Failure(e);
- }
+ ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
}
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
- if (info != null) {
+ if (info != null && info.isActiveMember()) {
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
@Override
public String persistenceId() {
- return "shard-manager-" + type;
+ return persistenceId;
}
@VisibleForTesting
return mBean;
}
- private void checkLocalShardExists(final String shardName, final ActorRef sender) {
- if (localShards.containsKey(shardName)) {
- String msg = String.format("Local shard %s already exists", shardName);
+ private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+ if (shardReplicaOperationsInProgress.contains(shardName)) {
+ String msg = String.format("A shard replica operation for %s is already in progress", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return true;
}
+
+ return false;
}
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- // verify the local shard replica is already available in the controller node
- LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
-
- checkLocalShardExists(shardName, getSender());
+ LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
return;
}
- Map<String, String> peerAddresses = getPeerAddresses(shardName);
- if (peerAddresses.isEmpty()) {
- String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
- return;
- }
-
Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("Failed to find leader for shard %s", shardName), failure)),
- getSelf());
+ String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
} else {
- if (!(response instanceof RemotePrimaryShardFound)) {
+ if(response instanceof RemotePrimaryShardFound) {
+ self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+ (RemotePrimaryShardFound)response), sender);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
- return;
+ sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+ new RuntimeException(msg)), getSelf());
}
-
- RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
- addShard (shardName, message, sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- checkLocalShardExists(shardName, sender);
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ shardReplicaOperationsInProgress.add(shardName);
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if(existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
- getPeerAddresses(shardName), datastoreContext,
- Shard.builder(), peerAddressResolver);
- shardInfo.setShardActiveMember(false);
- localShards.put(shardName, shardInfo);
- shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
- response.getPrimaryPath(), shardId);
+ response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+ duration());
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- // Remove the shard
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
-
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("AddServer request to leader %s for shard %s failed",
- response.getPrimaryPath(), shardName), failure)), getSelf());
+ String msg = String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName);
+ self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
- AddServerReply reply = (AddServerReply)addServerResponse;
- onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+ response.getPrimaryPath(), removeShardOnFailure), sender);
}
}
- }, new Dispatchers(context().system().dispatchers()).
- getDispatcher(Dispatchers.DispatcherType.Client));
- return;
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
}
- private void onAddServerReply (String shardName, ShardInformation shardInfo,
- AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
- shardInfo.setShardActiveMember(true);
+ shardInfo.setActiveMember(true);
persistShardList();
mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(true), getSelf());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- //remove the local replica created
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
+ Exception failure;
switch (replyMsg.getStatus()) {
case TIMEOUT:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardName))), getSelf());
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName));
break;
case NO_LEADER:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "There is no shard leader available for shard %s", shardName))), getSelf());
+ failure = createNoShardLeaderException(shardInfo.getShardId());
break;
default :
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "AddServer request to leader %s for shard %s failed with status %s",
- leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ failure = new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()));
}
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
}
}
private void persistShardList() {
- List<String> shardList = new ArrayList(localShards.keySet());
+ List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {
- if (!shardInfo.isShardActiveMember()) {
+ if (!shardInfo.isActiveMember()) {
shardList.remove(shardInfo.getShardName());
}
}
}
}
+ private static class ForwardedAddServerPrimaryShardFound {
+ String shardName;
+ RemotePrimaryShardFound primaryFound;
+
+ ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+ this.shardName = shardName;
+ this.primaryFound = primaryFound;
+ }
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
private DatastoreContext datastoreContext;
private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
- private boolean shardActiveStatus = true;
+ private boolean isActiveMember = true;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
return shardName;
}
+ @Nullable
ActorRef getActor(){
return actor;
}
this.leaderVersion = leaderVersion;
}
- void setShardActiveMember(boolean flag) {
- shardActiveStatus = flag;
+ boolean isActiveMember() {
+ return isActiveMember;
}
- boolean isShardActiveMember() {
- return shardActiveStatus;
+ void setActiveMember(boolean isActiveMember) {
+ this.isActiveMember = isActiveMember;
}
}