import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
private DatastoreSnapshot restoreFromSnapshot;
- private ShardManagerSnapshot recoveredSnapshot;
+ private ShardManagerSnapshot currentSnapshot;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if (message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
}
byte[] shardManagerSnapshot = null;
+ if(currentSnapshot != null) {
+ shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
+ }
+
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
- recoveredSnapshot.getShardList().contains(shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
+ currentSnapshot.getShardList().contains(shardName);
Map<String, String> peerAddresses;
boolean isActiveMember;
@Override
protected void handleRecover(Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
- createLocalShards();
+ onRecoveryCompleted();
} else if (message instanceof SnapshotOffer) {
- onSnapshotOffer((SnapshotOffer) message);
+ applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
+ }
+ }
+
+ private void onRecoveryCompleted() {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+ // journal on upgrade from Helium.
+ deleteMessages(lastSequenceNr());
+
+ if(currentSnapshot == null && restoreFromSnapshot != null &&
+ restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ restoreFromSnapshot.getShardManagerSnapshot()))) {
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+
+ LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+
+ applyShardManagerSnapshot(snapshot);
+ } catch(Exception e) {
+ LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
+ }
}
+
+ createLocalShards();
}
private void findLocalShard(FindLocalShard message) {
}
}
LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(new ShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList));
}
- private void onSnapshotOffer(SnapshotOffer offer) {
- recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+ private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+ currentSnapshot = new ShardManagerSnapshot(shardList);
+ return currentSnapshot;
+ }
+
+ private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ currentSnapshot = snapshot;
- LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
String currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
- for (String shard : recoveredSnapshot.getShardList()) {
+ for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
LOG.debug ("{}: adding shard {}", persistenceId(), shard);
}
}
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
private static class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;