// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- List<String> localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
- "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
+ mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ mBean.registerMBean();
}
@Override
persistenceId(), ((SaveSnapshotFailure) message).cause());
} else if(message instanceof Shutdown) {
onShutDown();
+ } else if (message instanceof GetLocalShardIds) {
+ onGetLocalShardIds();
} else {
unknownMessage(message);
}
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
- originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ originalSender.tell(new Status.Success(null), getSelf());
} else {
LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
- Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
- leaderPath, shardId);
- originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+ originalSender.tell(new Status.Failure(failure), getSelf());
}
}
}
if(notInitialized != null) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
- reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
- reply = new akka.actor.Status.Success(null);
+ reply = new Status.Success(null);
}
} catch (Exception e) {
LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new akka.actor.Status.Failure(e);
+ reply = new Status.Failure(e);
}
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- mBean.addLocalShard(shardId.toString());
-
if(schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
}
}
- private void onSwitchShardBehavior(SwitchShardBehavior message) {
- ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+ private void onGetLocalShardIds() {
+ final List<String> response = new ArrayList<>(localShards.size());
+
+ for (ShardInformation info : localShards.values()) {
+ response.add(info.getShardId().toString());
+ }
+
+ getSender().tell(new Status.Success(response), getSelf());
+ }
+
+ private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+ final ShardIdentifier identifier = message.getShardId();
- ShardInformation shardInformation = localShards.get(identifier.getShardName());
+ if (identifier != null) {
+ final ShardInformation info = localShards.get(identifier.getShardName());
+ if (info == null) {
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+ return;
+ }
- if(shardInformation != null && shardInformation.getActor() != null) {
- shardInformation.getActor().tell(
- new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
} else {
+ for (ShardInformation info : localShards.values()) {
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+ }
+ }
+
+ getSender().tell(new Status.Success(null), getSelf());
+ }
+
+ private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+ final ActorRef actor = info.getActor();
+ if (actor != null) {
+ actor.tell(switchBehavior, getSelf());
+ } else {
LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
- message.getShardName(), message.getNewState());
+ info.shardName, switchBehavior.getNewState());
}
}
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
shardSnapshots.get(shardName)), peerAddressResolver));
- mBean.addLocalShard(shardId.toString());
}
}
if (shardReplicaOperationsInProgress.contains(shardName)) {
String msg = String.format("A shard replica operation for %s is already in progress", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return true;
}
if (!(this.configuration.isShardConfigured(shardName))) {
String msg = String.format("No module configuration exists for shard %s", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
String msg = String.format("Local shard %s already exists", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
}
}
- sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ sender.tell(new Status.Failure(message == null ? failure :
new RuntimeException(message, failure)), getSelf());
}
shardInfo.setActiveMember(true);
persistShardList();
- mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(null), getSelf());
+ sender.tell(new Status.Success(null), getSelf());
} else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
@Override
public void onFailure(Throwable failure) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
- targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ targetActor.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
}
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
LOG.debug ("{}: {}", persistenceId, msg);
- targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), shardManagerActor);
}
}
package org.opendaylight.controller.cluster.datastore.shardmanager;
import akka.actor.ActorRef;
+import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
+import com.google.common.base.Throwables;
import java.util.List;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
final class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
public static final String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
- // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example
- private static final Collection<String> ACCEPTABLE_STATES
- = ImmutableList.of(RaftState.Leader.name(), RaftState.Follower.name());
-
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
+ private static final long ASK_TIMEOUT_MILLIS = 5000;
+ private final ActorRef shardManager;
private final String memberName;
- private final List<String> localShards;
- private boolean syncStatus = false;
+ private volatile boolean syncStatus = false;
- private ShardManager shardManager;
- private ShardManagerInfo(String memberName, String name, String mxBeanType, List<String> localShards) {
+ ShardManagerInfo(final ActorRef shardManager, final String memberName, final String name,
+ final String mxBeanType) {
super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
- this.memberName = memberName;
- this.localShards = localShards;
- }
-
- static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType,
- List<String> localShards){
- ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards);
-
- shardManagerInfo.registerMBean();
-
- return shardManagerInfo;
- }
-
- public void addLocalShard(String shardName) {
- localShards.add(shardName);
+ this.shardManager = Preconditions.checkNotNull(shardManager);
+ this.memberName = Preconditions.checkNotNull(memberName);
}
+ @SuppressWarnings("unchecked")
@Override
public List<String> getLocalShards() {
- return localShards;
+ try {
+ return (List<String>) Await.result(
+ Patterns.ask(shardManager, GetLocalShardIds.INSTANCE, ASK_TIMEOUT_MILLIS), Duration.Inf());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
@Override
public boolean getSyncStatus() {
- return this.syncStatus;
+ return syncStatus;
+ }
+
+ void setSyncStatus(boolean syncStatus) {
+ this.syncStatus = syncStatus;
}
@Override
return memberName;
}
- @Override
- public void switchAllLocalShardsState(String newState, long term) {
- LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
-
- for(String shardName : localShards){
- switchShardState(shardName, newState, term);
+ private void requestSwitchShardState(final ShardIdentifier shardId, final String newState, final long term) {
+ // Validates strings argument
+ final RaftState state = RaftState.valueOf(newState);
+
+ // Leader and Follower are the only states to which we can switch externally
+ switch (state) {
+ case Follower:
+ case Leader:
+ try {
+ Await.result(Patterns.ask(shardManager, new SwitchShardBehavior(shardId, state, term),
+ ASK_TIMEOUT_MILLIS), Duration.Inf());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ break;
+ case Candidate:
+ case IsolatedLeader:
+ default:
+ throw new IllegalArgumentException("Illegal target state " + state);
}
}
@Override
- public void switchShardState(String shardName, String newState, long term) {
- LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term);
-
- Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local");
- Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState));
-
- shardManager.getSelf().tell(new SwitchShardBehavior(shardName, RaftState.valueOf(newState), term),
- ActorRef.noSender());
- }
-
- public void setSyncStatus(boolean syncStatus){
- this.syncStatus = syncStatus;
+ public void switchAllLocalShardsState(String newState, long term) {
+ LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
+ requestSwitchShardState(null, newState, term);
}
- public void setShardManager(ShardManager shardManager){
- this.shardManager = shardManager;
+ @Override
+ public void switchShardState(String shardId, String newState, long term) {
+ final ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(
+ Preconditions.checkNotNull(shardId, "Shard id may not be null")).build();
+ LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardId, newState, term);
+ requestSwitchShardState(identifier, newState, term);
}
}