private DatastoreSnapshot restoreFromSnapshot;
+ private ShardManagerSnapshot recoveredSnapshot;
+
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
}
private void onCreateShard(CreateShard createShard) {
+ LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
reply = new akka.actor.Status.Success(null);
}
} catch (Exception e) {
- LOG.error("onCreateShard failed", e);
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
reply = new akka.actor.Status.Failure(e);
}
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
+ recoveredSnapshot.getShardList().contains(shardName);
+
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+ contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
- // The local member is not in the given shard member configuration. In this case we'll create
+ // The local member is not in the static shard member configuration and the shard did not
+ // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
// the shard with no peers and with elections disabled so it stays as follower. A
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+ isActiveMember);
ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
deleteMessages(lastSequenceNr());
createLocalShards();
} else if (message instanceof SnapshotOffer) {
- handleShardRecovery((SnapshotOffer) message);
+ onSnapshotOffer((SnapshotOffer) message);
}
}
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+ LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
saveSnapshot(new ShardManagerSnapshot(shardList));
}
- private void handleShardRecovery(SnapshotOffer offer) {
- LOG.debug ("{}: in handleShardRecovery", persistenceId());
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ private void onSnapshotOffer(SnapshotOffer offer) {
+ recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+
String currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
- for (String shard : snapshot.getShardList()) {
+ for (String shard : recoveredSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
LOG.debug ("{}: adding shard {}", persistenceId(), shard);