*/
package org.opendaylight.controller.cluster.persistence;
+import static com.google.common.base.Preconditions.checkArgument;
+
import akka.actor.ExtendedActorSystem;
import akka.dispatch.Futures;
import akka.persistence.SelectedSnapshot;
import akka.persistence.serialization.Snapshot;
import akka.persistence.serialization.SnapshotSerializer;
import akka.persistence.snapshot.japi.SnapshotStore;
+import akka.serialization.JavaSerializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
private final int maxLoadAttempts;
private final File snapshotDir;
- public LocalSnapshotStore(Config config) {
+ public LocalSnapshotStore(final Config config) {
this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
snapshotDir = new File(config.getString("dir"));
}
@Override
- public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+ public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
+ final SnapshotSelectionCriteria criteria) {
LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
// Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
return Futures.future(() -> doLoad(metadatas), executionContext);
}
- private Optional<SelectedSnapshot> doLoad(Deque<SnapshotMetadata> metadatas) throws IOException {
+ private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
SnapshotMetadata metadata = metadatas.removeFirst();
- File file = toSnapshotFile(metadata, "");
+ File file = toSnapshotFile(metadata);
LOG.debug("doLoad {}", file);
}
}
- private Object deserialize(File file) throws IOException {
- try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
- return in.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Error loading snapshot file " + file, e);
- } catch (IOException e) {
- LOG.debug("Error loading snapshot file {}", file, e);
-
- return tryDeserializeAkkaSnapshot(file);
- }
+ private Object deserialize(final File file) throws IOException {
+ return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
+ (Callable<Object>) () -> {
+ try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+ return in.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error loading snapshot file " + file, e);
+ } catch (IOException e) {
+ LOG.debug("Error loading snapshot file {}", file, e);
+ return tryDeserializeAkkaSnapshot(file);
+ }
+ });
}
- private Object tryDeserializeAkkaSnapshot(File file) throws IOException {
+ private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
LOG.debug("tryDeserializeAkkaSnapshot {}", file);
// The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
}
@Override
- public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
+ public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
return Futures.future(() -> doSave(metadata, snapshot), executionContext);
}
- private Void doSave(SnapshotMetadata metadata, Object snapshot) throws IOException {
- File temp = toSnapshotFile(metadata, ".tmp");
+ private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
+ final File actual = toSnapshotFile(metadata);
+ final File temp = File.createTempFile(actual.getName(), null, snapshotDir);
LOG.debug("Saving to temp file: {}", temp);
- try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) {
+ try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
out.writeObject(snapshot);
} catch (IOException e) {
- LOG.error("Error saving snapshot file {}", temp, e);
+ LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
+ if (!temp.delete()) {
+ LOG.error("Failed to successfully delete file {}", temp);
+ }
throw e;
}
- File actual = toSnapshotFile(metadata, "");
-
LOG.debug("Renaming to: {}", actual);
-
- if (!temp.renameTo(actual)) {
- throw new IOException(String.format("Failed to rename %s to %s", temp, actual));
+ try {
+ Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException e) {
+ LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
+ if (!temp.delete()) {
+ LOG.error("Failed to successfully delete file {}", temp);
+ }
+ throw e;
}
return null;
}
@Override
- public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
+ public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
LOG.debug("In doDeleteAsync - metadata: {}", metadata);
// Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
}
@Override
- public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+ public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
}
- private Void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) {
+ private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
- .flatMap(md -> Stream.of(toSnapshotFile(md, ""))).collect(Collectors.toList());
+ .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList());
LOG.debug("Deleting files: {}", files);
- files.forEach(file -> file.delete());
+ files.forEach(file -> {
+ try {
+ Files.delete(file.toPath());
+ } catch (IOException | SecurityException e) {
+ LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
+ }
+ });
return null;
}
- private Void doDelete(SnapshotMetadata metadata) {
+ private Void doDelete(final SnapshotMetadata metadata) {
final Collection<File> files = getSnapshotFiles(metadata);
LOG.debug("Deleting files: {}", files);
- files.forEach(file -> file.delete());
+ files.forEach(file -> {
+ try {
+ Files.delete(file.toPath());
+ } catch (IOException | SecurityException e) {
+ LOG.error("Unable to delete snapshot file: {}", file);
+ }
+ });
return null;
}
- private Collection<File> getSnapshotFiles(String persistenceId) {
+ private Collection<File> getSnapshotFiles(final String persistenceId) {
String encodedPersistenceId = encode(persistenceId);
- File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
+ File[] files = snapshotDir.listFiles((dir, name) -> {
int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
&& name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
return Arrays.asList(files);
}
- private Collection<File> getSnapshotFiles(SnapshotMetadata metadata) {
+ private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
SnapshotMetadata possible = extractMetadata(file);
return possible != null && possible.sequenceNr() == metadata.sequenceNr()
}).collect(Collectors.toList());
}
- private Collection<SnapshotMetadata> getSnapshotMetadatas(String persistenceId,
- SnapshotSelectionCriteria criteria) {
+ private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
+ final SnapshotSelectionCriteria criteria) {
return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
- .filter(md -> criteria.matches(md)).collect(Collectors.toList());
+ .filter(criteria::matches).collect(Collectors.toList());
}
- private static Stream<SnapshotMetadata> toStream(@Nullable SnapshotMetadata md) {
+ private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
return md != null ? Stream.of(md) : Stream.empty();
}
- @Nullable
- private static SnapshotMetadata extractMetadata(File file) {
+ private static @Nullable SnapshotMetadata extractMetadata(final File file) {
String name = file.getName();
int sequenceNumberEndIndex = name.lastIndexOf('-');
int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
}
try {
- String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex);
+ // Since the persistenceId is url encoded in the filename, we need
+ // to decode relevant filename's part to obtain persistenceId back
+ String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
}
}
- private File toSnapshotFile(SnapshotMetadata metadata, String extension) {
- return new File(snapshotDir, String.format("snapshot-%s-%d-%d%s", encode(metadata.persistenceId()),
- metadata.sequenceNr(), metadata.timestamp(), extension));
+ private File toSnapshotFile(final SnapshotMetadata metadata) {
+ return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()),
+ metadata.sequenceNr(), metadata.timestamp()));
}
private static <T> Collector<T, ?, List<T>> reverse() {
});
}
- private String encode(String str) {
+ private static String encode(final String str) {
try {
return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
}
}
+ private static String decode(final String str) {
+ try {
+ return URLDecoder.decode(str, StandardCharsets.UTF_8.name());
+ } catch (final UnsupportedEncodingException e) {
+ // Shouldn't happen
+ LOG.warn("Error decoding {}", str, e);
+ return str;
+ }
+ }
+
@VisibleForTesting
- static int compare(SnapshotMetadata m1, SnapshotMetadata m2) {
- return (int) (!m1.persistenceId().equals(m2.persistenceId())
- ? m1.persistenceId().compareTo(m2.persistenceId()) :
- m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
- m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
+ static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
+ checkArgument(m1.persistenceId().equals(m2.persistenceId()),
+ "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId());
+ final int cmp = Long.compare(m1.timestamp(), m2.timestamp());
+ return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr());
}
}