import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
this.configuration = builder.getConfiguration();
- this.datastoreContextFactory = builder.getDdatastoreContextFactory();
+ this.datastoreContextFactory = builder.getDatastoreContextFactory();
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- List<String> localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
- "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
+ mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ mBean.registerMBean();
}
@Override
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
} else if(message instanceof WrappedShardResponse){
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
+ } else if(message instanceof ChangeShardMembersVotingStatus){
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if(message instanceof FlipShardMembersVotingStatus){
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
} else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if(message instanceof SaveSnapshotFailure) {
persistenceId(), ((SaveSnapshotFailure) message).cause());
} else if(message instanceof Shutdown) {
onShutDown();
+ } else if (message instanceof GetLocalShardIds) {
+ onGetLocalShardIds();
+ } else if(message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
} else {
unknownMessage(message);
}
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
- originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ originalSender.tell(new Status.Success(null), getSelf());
} else {
LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
- Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
- leaderPath, shardId);
- originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
- }
- }
-
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+ originalSender.tell(new Status.Failure(failure), getSelf());
}
}
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
}
if(notInitialized != null) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
- reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
- reply = new akka.actor.Status.Success(null);
+ reply = new Status.Success(null);
}
} catch (Exception e) {
LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new akka.actor.Status.Failure(e);
+ reply = new Status.Failure(e);
}
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- mBean.addLocalShard(shardId.toString());
-
if(schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
if(!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
- if (shardId.getShardName() == null) {
+ final ShardIdentifier shardId;
+ try {
+ shardId = ShardIdentifier.fromShardIdString(actorName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}: ignoring actor {}", actorName, e);
return;
}
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
- @Override
- public Object get() {
- return new LocalShardFound(shardInformation.getActor());
- }
- });
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
}
private void sendResponse(ShardInformation shardInformation, boolean doWait,
final ActorRef sender = getSender();
final ActorRef self = self();
- Runnable replyRunnable = new Runnable() {
- @Override
- public void run() {
- sender.tell(messageSupplier.get(), self);
- }
- };
+ Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
new OnShardInitialized(replyRunnable);
} else if (!shardInformation.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
+ getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
+ getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
}
return;
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
}
+ @VisibleForTesting
+ static MemberName memberToName(final Member member) {
+ return MemberName.forName(member.roles().iterator().next());
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberExited(ClusterEvent.MemberExited message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
checkReady();
}
- private void addPeerAddress(String memberName, Address address) {
+ private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
for(ShardInformation info : localShards.values()){
}
private void memberReachable(ClusterEvent.ReachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
}
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
- private void markMemberUnavailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberUnavailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
}
- private void markMemberAvailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberAvailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
}
}
- private void onSwitchShardBehavior(SwitchShardBehavior message) {
- ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+ private void onGetLocalShardIds() {
+ final List<String> response = new ArrayList<>(localShards.size());
+
+ for (ShardInformation info : localShards.values()) {
+ response.add(info.getShardId().toString());
+ }
+
+ getSender().tell(new Status.Success(response), getSelf());
+ }
+
+ private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+ final ShardIdentifier identifier = message.getShardId();
- ShardInformation shardInformation = localShards.get(identifier.getShardName());
+ if (identifier != null) {
+ final ShardInformation info = localShards.get(identifier.getShardName());
+ if (info == null) {
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+ return;
+ }
- if(shardInformation != null && shardInformation.getActor() != null) {
- shardInformation.getActor().tell(
- new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
} else {
+ for (ShardInformation info : localShards.values()) {
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+ }
+ }
+
+ getSender().tell(new Status.Success(null), getSelf());
+ }
+
+ private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+ final ActorRef actor = info.getActor();
+ if (actor != null) {
+ actor.tell(switchBehavior, getSelf());
+ } else {
LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
- message.getShardName(), message.getNewState());
+ info.getShardName(), switchBehavior.getNewState());
}
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null && info.isActiveMember()) {
- sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
- @Override
- public Object get() {
- String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
- new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
+ sendResponse(info, message.isWaitUntilReady(), true, () -> {
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ }
- return found;
- }
+ return found;
});
return;
* @param shardName
* @return
*/
- private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
*
*/
private void createLocalShards() {
- String memberName = this.cluster.getCurrentMemberName();
+ MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
shardSnapshots.get(shardName)), peerAddressResolver));
- mBean.addLocalShard(shardId.toString());
}
}
* @param shardName
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<String> members = configuration.getMembersFromShardName(shardName);
+ Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
Map<String, String> peerAddresses = new HashMap<>();
- String currentMemberName = this.cluster.getCurrentMemberName();
+ MemberName currentMemberName = this.cluster.getCurrentMemberName();
- for(String memberName : members) {
- if(!currentMemberName.equals(memberName)) {
+ for (MemberName memberName : members) {
+ if (!currentMemberName.equals(memberName)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
peerAddresses.put(shardId.toString(), address);
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
- return SupervisorStrategy.resume();
- }
- }
+ (Function<Throwable, Directive>) t -> {
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+ return SupervisorStrategy.resume();
+ }
);
}
if (shardReplicaOperationsInProgress.contains(shardName)) {
String msg = String.format("A shard replica operation for %s is already in progress", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return true;
}
if (!(this.configuration.isShardConfigured(shardName))) {
String msg = String.format("No module configuration exists for shard %s", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
}
@Override
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
String msg = String.format("Local shard %s already exists", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
}
}
- sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ sender.tell(new Status.Failure(message == null ? failure :
new RuntimeException(message, failure)), getSelf());
}
shardInfo.setActiveMember(true);
persistShardList();
- mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(null), getSelf());
+ sender.tell(new Status.Success(null), getSelf());
} else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
+ }
+ }, getTargetActor());
}
});
}
LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
- String currentMember = cluster.getCurrentMemberName();
+ final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
for (String shard : currentSnapshot.getShardList()) {
0, 0));
}
- private static class ForwardedAddServerReply {
- ShardInformation shardInfo;
- AddServerReply addServerReply;
- String leaderPath;
- boolean removeShardOnFailure;
+ private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
- ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
- boolean removeShardOnFailure) {
- this.shardInfo = shardInfo;
- this.addServerReply = addServerReply;
- this.leaderPath = leaderPath;
- this.removeShardOnFailure = removeShardOnFailure;
+ String shardName = changeMembersVotingStatus.getShardName();
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+ e.getValue());
}
- }
- private static class ForwardedAddServerFailure {
- String shardName;
- String failureMessage;
- Throwable failure;
- boolean removeShardOnFailure;
+ ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
- ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
- boolean removeShardOnFailure) {
- this.shardName = shardName;
- this.failureMessage = failureMessage;
- this.failure = failure;
- this.removeShardOnFailure = removeShardOnFailure;
- }
+ findLocalShard(shardName, getSender(),
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
}
- @VisibleForTesting
- protected static class ShardInformation {
- private final ShardIdentifier shardId;
- private final String shardName;
- private ActorRef actor;
- private final Map<String, String> initialPeerAddresses;
- private Optional<DataTree> localShardDataTree;
- private boolean leaderAvailable = false;
-
- // flag that determines if the actor is ready for business
- private boolean actorInitialized = false;
-
- private boolean followerSyncStatus = false;
-
- private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
- private String role ;
- private String leaderId;
- private short leaderVersion;
+ private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
- private DatastoreContext datastoreContext;
- private Shard.AbstractBuilder<?, ?> builder;
- private final ShardPeerAddressResolver addressResolver;
- private boolean isActiveMember = true;
+ ActorRef sender = getSender();
+ final String shardName = flipMembersVotingStatus.getShardName();
+ findLocalShard(shardName, sender, localShardFound -> {
+ Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+ Timeout.apply(30, TimeUnit.SECONDS));
- private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
- Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
- this.shardName = shardName;
- this.shardId = shardId;
- this.initialPeerAddresses = initialPeerAddresses;
- this.datastoreContext = datastoreContext;
- this.builder = builder;
- this.addressResolver = addressResolver;
- }
-
- Props newProps(SchemaContext schemaContext) {
- Preconditions.checkNotNull(builder);
- Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
- schemaContext(schemaContext).props();
- builder = null;
- return props;
- }
-
- String getShardName() {
- return shardName;
- }
-
- @Nullable
- ActorRef getActor(){
- return actor;
- }
-
- void setActor(ActorRef actor) {
- this.actor = actor;
- }
-
- ShardIdentifier getShardId() {
- return shardId;
- }
-
- void setLocalDataTree(Optional<DataTree> localShardDataTree) {
- this.localShardDataTree = localShardDataTree;
- }
-
- Optional<DataTree> getLocalShardDataTree() {
- return localShardDataTree;
- }
-
- DatastoreContext getDatastoreContext() {
- return datastoreContext;
- }
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to access local shard %s", shardName), failure)), self());
+ return;
+ }
- void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
- this.datastoreContext = datastoreContext;
- if (actor != null) {
- LOG.debug ("Sending new DatastoreContext to {}", shardId);
- actor.tell(this.datastoreContext, sender);
- }
- }
+ OnDemandRaftState raftState = (OnDemandRaftState) response;
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ serverVotingStatusMap.put(e.getKey(), !e.getValue());
+ }
- void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
- LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+ toString(), !raftState.isVoting());
- if(actor != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
+ changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+ shardName, localShardFound.getPath(), sender);
}
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
- actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
- }
-
- notifyOnShardInitializedCallbacks();
- }
-
- void peerDown(String memberName, String peerId, ActorRef sender) {
- if(actor != null) {
- actor.tell(new PeerDown(memberName, peerId), sender);
- }
- }
-
- void peerUp(String memberName, String peerId, ActorRef sender) {
- if(actor != null) {
- actor.tell(new PeerUp(memberName, peerId), sender);
- }
- }
-
- boolean isShardReady() {
- return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
- }
-
- boolean isShardReadyWithLeaderId() {
- return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
- (isLeader() || addressResolver.resolve(leaderId) != null);
- }
-
- boolean isShardInitialized() {
- return getActor() != null && actorInitialized;
- }
-
- boolean isLeader() {
- return Objects.equal(leaderId, shardId.toString());
- }
-
- String getSerializedLeaderActor() {
- if(isLeader()) {
- return Serialization.serializedActorPath(getActor());
- } else {
- return addressResolver.resolve(leaderId);
- }
- }
-
- void setActorInitialized() {
- LOG.debug("Shard {} is initialized", shardId);
-
- this.actorInitialized = true;
-
- notifyOnShardInitializedCallbacks();
- }
-
- private void notifyOnShardInitializedCallbacks() {
- if(onShardInitializedSet.isEmpty()) {
- return;
- }
-
- boolean ready = isShardReadyWithLeaderId();
+ }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
- ready ? "ready" : "initialized", onShardInitializedSet.size());
- }
+ private void findLocalShard(final String shardName, final ActorRef sender,
+ final Consumer<LocalShardFound> onLocalShardFound) {
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
- Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
- while(iter.hasNext()) {
- OnShardInitialized onShardInitialized = iter.next();
- if(!(onShardInitialized instanceof OnShardReady) || ready) {
- iter.remove();
- onShardInitialized.getTimeoutSchedule().cancel();
- onShardInitialized.getReplyRunnable().run();
+ Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } else {
+ if(response instanceof LocalShardFound) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ onLocalShardFound.accept((LocalShardFound) response);
+ }
+ }, sender);
+ } else if(response instanceof LocalShardNotFound) {
+ String msg = String.format("Local shard %s does not exist", shardName);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ } else {
+ String msg = String.format("Failed to find local shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), self());
+ }
}
}
- }
-
- void addOnShardInitialized(OnShardInitialized onShardInitialized) {
- onShardInitializedSet.add(onShardInitialized);
- }
-
- void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
- onShardInitializedSet.remove(onShardInitialized);
- }
-
- void setRole(String newRole) {
- this.role = newRole;
-
- notifyOnShardInitializedCallbacks();
- }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
- void setFollowerSyncStatus(boolean syncStatus){
- this.followerSyncStatus = syncStatus;
+ private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+ final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
}
- boolean isInSync(){
- if(RaftState.Follower.name().equals(this.role)){
- return followerSyncStatus;
- } else if(RaftState.Leader.name().equals(this.role)){
- return true;
- }
-
- return false;
- }
+ shardReplicaOperationsInProgress.add(shardName);
- boolean setLeaderId(String leaderId) {
- boolean changed = !Objects.equal(this.leaderId, leaderId);
- this.leaderId = leaderId;
- if(leaderId != null) {
- this.leaderAvailable = true;
- }
- notifyOnShardInitializedCallbacks();
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- return changed;
- }
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+ changeServersVotingStatus, shardActorRef.path());
- String getLeaderId() {
- return leaderId;
- }
+ Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+ Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
- void setLeaderAvailable(boolean leaderAvailable) {
- this.leaderAvailable = leaderAvailable;
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path());
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+ ServerChangeReply replyMsg = (ServerChangeReply) response;
+ if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ sender.tell(new Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member must be voting",
+ shardId.getShardName()))), getSelf());
+ } else {
+ LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
- if(leaderAvailable) {
- notifyOnShardInitializedCallbacks();
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), getSelf());
+ }
+ }
}
- }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
- short getLeaderVersion() {
- return leaderVersion;
- }
+ private static final class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
- void setLeaderVersion(short leaderVersion) {
- this.leaderVersion = leaderVersion;
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
}
+ }
- boolean isActiveMember() {
- return isActiveMember;
- }
+ private static final class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
- void setActiveMember(boolean isActiveMember) {
- this.isActiveMember = isActiveMember;
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
}
}
- private static class OnShardInitialized {
+ static class OnShardInitialized {
private final Runnable replyRunnable;
private Cancellable timeoutSchedule;
}
}
- private static class OnShardReady extends OnShardInitialized {
+ static class OnShardReady extends OnShardInitialized {
OnShardReady(Runnable replyRunnable) {
super(replyRunnable);
}
}
- private static class ShardNotInitializedTimeout {
- private final ActorRef sender;
- private final ShardInformation shardInfo;
- private final OnShardInitialized onShardInitialized;
-
- ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
- this.sender = sender;
- this.shardInfo = shardInfo;
- this.onShardInitialized = onShardInitialized;
- }
-
- ActorRef getSender() {
- return sender;
- }
-
- ShardInformation getShardInfo() {
- return shardInfo;
- }
-
- OnShardInitialized getOnShardInitialized() {
- return onShardInitialized;
- }
- }
-
private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
getShardInitializationTimeout().duration().$times(2));
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private static interface RunnableMessage extends Runnable {
+ }
+
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
* a remote or local find primary message is processed
@Override
public void onFailure(Throwable failure) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
- targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ targetActor.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
}
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
LOG.debug ("{}: {}", persistenceId, msg);
- targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), shardManagerActor);
}
}
-
- /**
- * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
- * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
- * as a successful response to find primary.
- */
- private static class PrimaryShardFoundForContext {
- private final String shardName;
- private final Object contextMessage;
- private final RemotePrimaryShardFound remotePrimaryShardFound;
- private final LocalPrimaryShardFound localPrimaryShardFound;
-
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
- @Nonnull Object primaryFoundMessage) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.contextMessage = Preconditions.checkNotNull(contextMessage);
- Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
- (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
- (LocalPrimaryShardFound) primaryFoundMessage : null;
- }
-
- @Nonnull
- String getPrimaryPath(){
- if(remotePrimaryShardFound != null) {
- return remotePrimaryShardFound.getPrimaryPath();
- }
- return localPrimaryShardFound.getPrimaryPath();
- }
-
- @Nonnull
- Object getContextMessage() {
- return contextMessage;
- }
-
- @Nullable
- RemotePrimaryShardFound getRemotePrimaryShardFound() {
- return remotePrimaryShardFound;
- }
-
- @Nonnull
- String getShardName() {
- return shardName;
- }
- }
-
/**
* The WrappedShardResponse class wraps a response from a Shard.
*/
- private static class WrappedShardResponse {
+ private static final class WrappedShardResponse {
private final ShardIdentifier shardId;
private final Object response;
private final String leaderPath;
- private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+ WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
this.shardId = shardId;
this.response = response;
this.leaderPath = leaderPath;
return leaderPath;
}
}
+
+ private static final class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
}