import akka.persistence.serialization.Snapshot;
import akka.persistence.serialization.SnapshotSerializer;
import akka.persistence.snapshot.japi.SnapshotStore;
+import akka.serialization.JavaSerializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
}
private Object deserialize(final File file) throws IOException {
- try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
- return in.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Error loading snapshot file " + file, e);
- } catch (IOException e) {
- LOG.debug("Error loading snapshot file {}", file, e);
-
- return tryDeserializeAkkaSnapshot(file);
- }
+ return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
+ (Callable<Object>) () -> {
+ try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
+ return in.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error loading snapshot file " + file, e);
+ } catch (IOException e) {
+ LOG.debug("Error loading snapshot file {}", file, e);
+ return tryDeserializeAkkaSnapshot(file);
+ }
+ });
}
private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
LOG.debug("Deleting files: {}", files);
- files.forEach(file -> file.delete());
+ files.forEach(File::delete);
return null;
}
LOG.debug("Deleting files: {}", files);
- files.forEach(file -> file.delete());
+ files.forEach(File::delete);
return null;
}
private Collection<File> getSnapshotFiles(final String persistenceId) {
String encodedPersistenceId = encode(persistenceId);
- File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
+ File[] files = snapshotDir.listFiles((dir, name) -> {
int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
&& name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
final SnapshotSelectionCriteria criteria) {
return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
- .filter(md -> criteria.matches(md)).collect(Collectors.toList());
+ .filter(criteria::matches).collect(Collectors.toList());
}
- private static Stream<SnapshotMetadata> toStream(@Nullable final SnapshotMetadata md) {
+ private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
return md != null ? Stream.of(md) : Stream.empty();
}
- @Nullable
- private static SnapshotMetadata extractMetadata(final File file) {
+ private static @Nullable SnapshotMetadata extractMetadata(final File file) {
String name = file.getName();
int sequenceNumberEndIndex = name.lastIndexOf('-');
int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);