import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
private DatastoreSnapshot restoreFromSnapshot;
+ private ShardManagerSnapshot currentSnapshot;
+
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if (message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
}
byte[] shardManagerSnapshot = null;
+ if(currentSnapshot != null) {
+ shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
+ }
+
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
}
private void onCreateShard(CreateShard createShard) {
+ LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
reply = new akka.actor.Status.Success(null);
}
} catch (Exception e) {
- LOG.error("onCreateShard failed", e);
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
reply = new akka.actor.Status.Failure(e);
}
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
+ currentSnapshot.getShardList().contains(shardName);
+
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+ contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
- // The local member is not in the given shard member configuration. In this case we'll create
+ // The local member is not in the static shard member configuration and the shard did not
+ // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
// the shard with no peers and with elections disabled so it stays as follower. A
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+ isActiveMember);
ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
@Override
protected void handleRecover(Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
- createLocalShards();
+ onRecoveryCompleted();
} else if (message instanceof SnapshotOffer) {
- handleShardRecovery((SnapshotOffer) message);
+ applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
}
}
+ private void onRecoveryCompleted() {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+ // journal on upgrade from Helium.
+ deleteMessages(lastSequenceNr());
+
+ if(currentSnapshot == null && restoreFromSnapshot != null &&
+ restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ restoreFromSnapshot.getShardManagerSnapshot()))) {
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+
+ LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+
+ applyShardManagerSnapshot(snapshot);
+ } catch(Exception e) {
+ LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
+ }
+ }
+
+ createLocalShards();
+ }
+
private void findLocalShard(FindLocalShard message) {
final ShardInformation shardInformation = localShards.get(message.getShardName());
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+ LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
}
}
LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(new ShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList));
}
- private void handleShardRecovery(SnapshotOffer offer) {
- LOG.debug ("{}: in handleShardRecovery", persistenceId());
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+ currentSnapshot = new ShardManagerSnapshot(shardList);
+ return currentSnapshot;
+ }
+
+ private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ currentSnapshot = snapshot;
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+
String currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
- for (String shard : snapshot.getShardList()) {
+ for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
LOG.debug ("{}: adding shard {}", persistenceId(), shard);
}
}
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
private static class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;