import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
private DatastoreSnapshot restoreFromSnapshot;
+ private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+ private final String id;
+
/**
*/
protected ShardManager(Builder builder) {
this.primaryShardInfoCache = builder.primaryShardInfoCache;
this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ id = "shard-manager-" + type;
+
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
// Subscribe this actor to cluster member events
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- if(localShards.containsKey(moduleShardConfig.getShardName())) {
- throw new IllegalStateException(String.format("Shard with name %s already exists",
- moduleShardConfig.getShardName()));
+ String shardName = createShard.getModuleShardConfig().getShardName();
+ if(localShards.containsKey(shardName)) {
+ reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ } else {
+ doCreateShard(createShard);
+ reply = new akka.actor.Status.Success(null);
}
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
- configuration.addModuleShardConfiguration(moduleShardConfig);
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
- Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
- moduleShardConfig.getShardMemberNames()*/);
+ private void doCreateShard(CreateShard createShard) {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ String shardName = moduleShardConfig.getShardName();
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ configuration.addModuleShardConfiguration(moduleShardConfig);
- DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
- shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
- } else {
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
- }
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = newShardDatastoreContext(shardName);
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
- ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
- shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
- localShards.put(info.getShardName(), info);
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- mBean.addLocalShard(shardId.toString());
+ Map<String, String> peerAddresses;
+ boolean isActiveMember;
+ if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ peerAddresses = getPeerAddresses(shardName);
+ isActiveMember = true;
+ } else {
+ // The local member is not in the given shard member configuration. In this case we'll create
+ // the shard with no peers and with elections disabled so it stays as follower. A
+ // subsequent AddServer request will be needed to make it an active member.
+ isActiveMember = false;
+ peerAddresses = Collections.emptyMap();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ }
- if(schemaContext != null) {
- info.setActor(newShardActor(schemaContext, info));
- }
+ LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
- reply = new CreateShardReply();
- } catch (Exception e) {
- LOG.error("onCreateShard failed", e);
- reply = new akka.actor.Status.Failure(e);
- }
+ ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
}
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
- if (info != null) {
+ if (info != null && info.isActiveMember()) {
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
@Override
public String persistenceId() {
- return "shard-manager-" + type;
+ return id;
}
@VisibleForTesting
return mBean;
}
- private void checkLocalShardExists(final String shardName, final ActorRef sender) {
- if (localShards.containsKey(shardName)) {
- String msg = String.format("Local shard %s already exists", shardName);
+ private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+ if (shardReplicaOperationsInProgress.contains(shardName)) {
+ String msg = String.format("A shard replica operation for %s is already in progress", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return true;
}
+
+ return false;
}
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- // verify the local shard replica is already available in the controller node
LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
- checkLocalShardExists(shardName, getSender());
-
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
String msg = String.format("No module configuration exists for shard %s", shardName);
return;
}
- Map<String, String> peerAddresses = getPeerAddresses(shardName);
- if (peerAddresses.isEmpty()) {
- String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
- return;
- }
-
Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("Failed to find leader for shard %s", shardName), failure)),
- getSelf());
+ String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
} else {
- if (!(response instanceof RemotePrimaryShardFound)) {
+ if(response instanceof RemotePrimaryShardFound) {
+ RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+ addShard (shardName, message, sender);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
- return;
+ sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+ new RuntimeException(msg)), getSelf());
}
-
- RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
- addShard (shardName, message, sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- checkLocalShardExists(shardName, sender);
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if(existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
- final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
- getPeerAddresses(shardName), datastoreContext,
- Shard.builder(), peerAddressResolver);
- shardInfo.setShardActiveMember(false);
- localShards.put(shardName, shardInfo);
- shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
- response.getPrimaryPath(), shardId);
+ response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+ duration());
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
+ shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- // Remove the shard
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
-
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("AddServer request to leader %s for shard %s failed",
- response.getPrimaryPath(), shardName), failure)), getSelf());
+ onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
} else {
AddServerReply reply = (AddServerReply)addServerResponse;
- onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
}
}
- }, new Dispatchers(context().system().dispatchers()).
- getDispatcher(Dispatchers.DispatcherType.Client));
- return;
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
}
- private void onAddServerReply (String shardName, ShardInformation shardInfo,
- AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
- shardInfo.setShardActiveMember(true);
+ shardInfo.setActiveMember(true);
persistShardList();
mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(true), getSelf());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- //remove the local replica created
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
+ Exception failure;
switch (replyMsg.getStatus()) {
case TIMEOUT:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardName))), getSelf());
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName));
break;
case NO_LEADER:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "There is no shard leader available for shard %s", shardName))), getSelf());
+ failure = createNoShardLeaderException(shardInfo.getShardId());
break;
default :
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "AddServer request to leader %s for shard %s failed with status %s",
- leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ failure = new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()));
}
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
}
}
private void persistShardList() {
- List<String> shardList = new ArrayList(localShards.keySet());
+ List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {
- if (!shardInfo.isShardActiveMember()) {
+ if (!shardInfo.isActiveMember()) {
shardList.remove(shardInfo.getShardName());
}
}
private DatastoreContext datastoreContext;
private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
- private boolean shardActiveStatus = true;
+ private boolean isActiveMember = true;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
this.leaderVersion = leaderVersion;
}
- void setShardActiveMember(boolean flag) {
- shardActiveStatus = flag;
+ boolean isActiveMember() {
+ return isActiveMember;
}
- boolean isShardActiveMember() {
- return shardActiveStatus;
+ void setActiveMember(boolean isActiveMember) {
+ this.isActiveMember = isActiveMember;
}
}