2 * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.persistence;
10 import static com.google.common.base.Preconditions.checkArgument;
12 import akka.actor.ExtendedActorSystem;
13 import akka.dispatch.Futures;
14 import akka.persistence.SelectedSnapshot;
15 import akka.persistence.SnapshotMetadata;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.persistence.serialization.Snapshot;
18 import akka.persistence.serialization.SnapshotSerializer;
19 import akka.persistence.snapshot.japi.SnapshotStore;
20 import akka.serialization.JavaSerializer;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.io.ByteStreams;
23 import com.typesafe.config.Config;
24 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
25 import java.io.BufferedInputStream;
27 import java.io.FileInputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.ObjectInputStream;
31 import java.io.ObjectOutputStream;
32 import java.net.URLDecoder;
33 import java.net.URLEncoder;
34 import java.nio.charset.StandardCharsets;
35 import java.nio.file.Files;
36 import java.nio.file.StandardCopyOption;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.Deque;
42 import java.util.List;
43 import java.util.Optional;
44 import java.util.concurrent.Callable;
45 import java.util.stream.Collector;
46 import java.util.stream.Collectors;
47 import java.util.stream.Stream;
48 import org.eclipse.jdt.annotation.Nullable;
49 import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.ExecutionContext;
53 import scala.concurrent.Future;
56 * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
57 * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
58 * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
59 * by serializing the data directly to the file.
61 * @author Thomas Pantelis
63 public class LocalSnapshotStore extends SnapshotStore {
64 private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
65 private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
67 private final InputOutputStreamFactory streamFactory;
68 private final ExecutionContext executionContext;
69 private final int maxLoadAttempts;
70 private final File snapshotDir;
72 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
73 public LocalSnapshotStore(final Config config) {
74 executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
75 snapshotDir = new File(config.getString("dir"));
77 final int localMaxLoadAttempts = config.getInt("max-load-attempts");
78 maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
80 if (config.getBoolean("use-lz4-compression")) {
81 final String size = config.getString("lz4-blocksize");
82 streamFactory = InputOutputStreamFactory.lz4(size);
83 LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
85 streamFactory = InputOutputStreamFactory.simple();
86 LOG.debug("Using plain Input/Output Stream");
89 LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
93 public void preStart() throws Exception {
94 if (!snapshotDir.isDirectory()) {
95 // Try to create the directory, on failure double check if someone else beat us to it.
96 if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
97 throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
105 public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
106 final SnapshotSelectionCriteria criteria) {
107 LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
109 // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
110 // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
111 // will fail but loading an older snapshot may succeed.
113 Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
114 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
115 .collect(Collectors.toCollection(ArrayDeque::new));
117 if (metadatas.isEmpty()) {
118 return Futures.successful(Optional.empty());
121 LOG.debug("doLoadAsync - found: {}", metadatas);
123 return Futures.future(() -> doLoad(metadatas), executionContext);
126 private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
127 SnapshotMetadata metadata = metadatas.removeFirst();
128 File file = toSnapshotFile(metadata);
130 LOG.debug("doLoad {}", file);
133 Object data = deserialize(file);
135 LOG.debug("deserialized data: {}", data);
137 return Optional.of(new SelectedSnapshot(metadata, data));
138 } catch (IOException e) {
139 LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
141 if (metadatas.isEmpty()) {
145 return doLoad(metadatas);
149 private Object deserialize(final File file) throws IOException {
150 return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
151 (Callable<Object>) () -> {
152 try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
153 return in.readObject();
154 } catch (ClassNotFoundException e) {
155 throw new IOException("Error loading snapshot file " + file, e);
156 } catch (IOException e) {
157 LOG.debug("Error loading snapshot file {}", file, e);
158 return tryDeserializeAkkaSnapshot(file);
163 private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
164 LOG.debug("tryDeserializeAkkaSnapshot {}", file);
166 // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
167 // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
168 // the SnapshotSerializer to try to de-serialize it.
170 SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
172 try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
173 return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
178 public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
179 LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
181 return Futures.future(() -> doSave(metadata, snapshot), executionContext);
184 private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
185 final File actual = toSnapshotFile(metadata);
186 final File temp = File.createTempFile(actual.getName(), null, snapshotDir);
188 LOG.debug("Saving to temp file: {}", temp);
190 try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
191 out.writeObject(snapshot);
192 } catch (IOException e) {
193 LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
194 if (!temp.delete()) {
195 LOG.error("Failed to successfully delete file {}", temp);
200 LOG.debug("Renaming to: {}", actual);
202 Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
203 } catch (IOException e) {
204 LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
205 if (!temp.delete()) {
206 LOG.error("Failed to successfully delete file {}", temp);
215 public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
216 LOG.debug("In doDeleteAsync - metadata: {}", metadata);
218 // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
219 // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
220 // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
222 return Futures.future(() -> doDelete(metadata), executionContext);
226 public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
227 LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
229 return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
232 private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
233 final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
234 .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList());
236 LOG.debug("Deleting files: {}", files);
238 files.forEach(file -> {
240 Files.delete(file.toPath());
241 } catch (IOException | SecurityException e) {
242 LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
248 private Void doDelete(final SnapshotMetadata metadata) {
249 final Collection<File> files = getSnapshotFiles(metadata);
251 LOG.debug("Deleting files: {}", files);
253 files.forEach(file -> {
255 Files.delete(file.toPath());
256 } catch (IOException | SecurityException e) {
257 LOG.error("Unable to delete snapshot file: {}", file);
263 private Collection<File> getSnapshotFiles(final String persistenceId) {
264 String encodedPersistenceId = encode(persistenceId);
266 File[] files = snapshotDir.listFiles((dir, name) -> {
267 int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
268 return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
269 && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
273 return Collections.emptyList();
276 if (LOG.isDebugEnabled()) {
277 LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
278 Arrays.toString(files));
281 return Arrays.asList(files);
284 private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
285 return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
286 SnapshotMetadata possible = extractMetadata(file);
287 return possible != null && possible.sequenceNr() == metadata.sequenceNr()
288 && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
289 }).collect(Collectors.toList());
292 private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
293 final SnapshotSelectionCriteria criteria) {
294 return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
295 .filter(criteria::matches).collect(Collectors.toList());
298 private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
299 return md != null ? Stream.of(md) : Stream.empty();
302 private static @Nullable SnapshotMetadata extractMetadata(final File file) {
303 String name = file.getName();
304 int sequenceNumberEndIndex = name.lastIndexOf('-');
305 int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
306 if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
311 // Since the persistenceId is url encoded in the filename, we need
312 // to decode relevant filename's part to obtain persistenceId back
313 String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
314 long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
315 long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
316 return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
317 } catch (NumberFormatException e) {
322 private File toSnapshotFile(final SnapshotMetadata metadata) {
323 return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()),
324 metadata.sequenceNr(), metadata.timestamp()));
327 private static <T> Collector<T, ?, List<T>> reverse() {
328 return Collectors.collectingAndThen(Collectors.toList(), list -> {
329 Collections.reverse(list);
334 private static String encode(final String str) {
335 return URLEncoder.encode(str, StandardCharsets.UTF_8);
338 private static String decode(final String str) {
339 return URLDecoder.decode(str, StandardCharsets.UTF_8);
343 static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
344 checkArgument(m1.persistenceId().equals(m2.persistenceId()),
345 "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId());
346 final int cmp = Long.compare(m1.timestamp(), m2.timestamp());
347 return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr());