import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if (message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
}
}
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
private static class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;