import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
private final String shardDispatcherPath;
- private ShardManagerInfo mBean;
+ private final ShardManagerInfo mBean;
private DatastoreContextFactory datastoreContextFactory;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- createLocalShards();
+ List<String> localShardActorNames = new ArrayList<>();
+ mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+ localShardActorNames);
+ mBean.setShardManager(this);
}
@Override
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
+ } else if (message instanceof SaveSnapshotSuccess) {
+ LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+ persistenceId(), ((SaveSnapshotFailure)message).cause());
} else {
unknownMessage(message);
}
// We no longer persist SchemaContext modules so delete all the prior messages from the akka
// journal on upgrade from Helium.
deleteMessages(lastSequenceNr());
+ createLocalShards();
+ } else if (message instanceof SnapshotOffer) {
+ handleShardRecovery((SnapshotOffer) message);
}
}
restoreFromSnapshot = null; // null out to GC
- List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
- localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
shardSnapshots.get(shardName)), peerAddressResolver));
+ mBean.addLocalShard(shardId.toString());
}
-
- mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
-
- mBean.setShardManager(this);
}
/**
final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
getPeerAddresses(shardName), datastoreContext,
Shard.builder(), peerAddressResolver);
+ shardInfo.setShardActiveMember(false);
localShards.put(shardName, shardInfo);
shardInfo.setActor(newShardActor(schemaContext, shardInfo));
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setShardActiveMember(true);
+ persistShardList();
mBean.addLocalShard(shardInfo.getShardId().toString());
sender.tell(new akka.actor.Status.Success(true), getSelf());
return;
}
+ private void persistShardList() {
+ List<String> shardList = new ArrayList(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(new ShardManagerSnapshot(shardList));
+ }
+
+ private void handleShardRecovery(SnapshotOffer offer) {
+ LOG.debug ("{}: in handleShardRecovery", persistenceId());
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : snapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
private DatastoreContext datastoreContext;
private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
+ private boolean shardActiveStatus = true;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
void setLeaderVersion(short leaderVersion) {
this.leaderVersion = leaderVersion;
}
+
+ void setShardActiveMember(boolean flag) {
+ shardActiveStatus = flag;
+ }
+
+ boolean isShardActiveMember() {
+ return shardActiveStatus;
+ }
}
private static class OnShardInitialized {
return this;
}
- public Props props() {
+ protected void verify() {
sealed = true;
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+ }
+
+ public Props props() {
+ verify();
return Props.create(ShardManager.class, this);
}
}