Add buffering to LocalSnapshotStore
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import akka.actor.ExtendedActorSystem;
11 import akka.dispatch.Futures;
12 import akka.persistence.SelectedSnapshot;
13 import akka.persistence.SnapshotMetadata;
14 import akka.persistence.SnapshotSelectionCriteria;
15 import akka.persistence.serialization.Snapshot;
16 import akka.persistence.serialization.SnapshotSerializer;
17 import akka.persistence.snapshot.japi.SnapshotStore;
18 import akka.serialization.JavaSerializer;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.io.ByteStreams;
21 import com.typesafe.config.Config;
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.ObjectInputStream;
30 import java.io.ObjectOutputStream;
31 import java.io.UnsupportedEncodingException;
32 import java.net.URLDecoder;
33 import java.net.URLEncoder;
34 import java.nio.charset.StandardCharsets;
35 import java.nio.file.Files;
36 import java.nio.file.StandardCopyOption;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.Deque;
42 import java.util.List;
43 import java.util.Optional;
44 import java.util.concurrent.Callable;
45 import java.util.stream.Collector;
46 import java.util.stream.Collectors;
47 import java.util.stream.Stream;
48 import org.eclipse.jdt.annotation.Nullable;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.ExecutionContext;
52 import scala.concurrent.Future;
53
54 /**
55  * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
56  * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
57  * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
58  * by serializing the data directly to the file.
59  *
60  * @author Thomas Pantelis
61  */
62 public class LocalSnapshotStore extends SnapshotStore {
63     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
64
65     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
66
67     private final ExecutionContext executionContext;
68     private final int maxLoadAttempts;
69     private final File snapshotDir;
70
71     public LocalSnapshotStore(final Config config) {
72         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
73         snapshotDir = new File(config.getString("dir"));
74
75         int localMaxLoadAttempts = config.getInt("max-load-attempts");
76         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
77
78         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
79     }
80
81     @Override
82     public void preStart() throws Exception {
83         if (!snapshotDir.isDirectory()) {
84             // Try to create the directory, on failure double check if someone else beat us to it.
85             if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
86                 throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
87             }
88         }
89
90         super.preStart();
91     }
92
93     @Override
94     public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
95                                                           final SnapshotSelectionCriteria criteria) {
96         LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
97
98         // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
99         // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
100         // will fail but loading an older snapshot may succeed.
101
102         Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
103                 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
104                     .collect(Collectors.toCollection(ArrayDeque::new));
105
106         if (metadatas.isEmpty()) {
107             return Futures.successful(Optional.empty());
108         }
109
110         LOG.debug("doLoadAsync - found: {}", metadatas);
111
112         return Futures.future(() -> doLoad(metadatas), executionContext);
113     }
114
115     private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
116         SnapshotMetadata metadata = metadatas.removeFirst();
117         File file = toSnapshotFile(metadata);
118
119         LOG.debug("doLoad {}", file);
120
121         try {
122             Object data = deserialize(file);
123
124             LOG.debug("deserialized data: {}", data);
125
126             return Optional.of(new SelectedSnapshot(metadata, data));
127         } catch (IOException e) {
128             LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
129
130             if (metadatas.isEmpty()) {
131                 throw e;
132             }
133
134             return doLoad(metadatas);
135         }
136     }
137
138     private Object deserialize(final File file) throws IOException {
139         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
140             (Callable<Object>) () -> {
141                 try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
142                     return in.readObject();
143                 } catch (ClassNotFoundException e) {
144                     throw new IOException("Error loading snapshot file " + file, e);
145                 } catch (IOException e) {
146                     LOG.debug("Error loading snapshot file {}", file, e);
147                     return tryDeserializeAkkaSnapshot(file);
148                 }
149             });
150     }
151
152     private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
153         LOG.debug("tryDeserializeAkkaSnapshot {}", file);
154
155         // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
156         // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
157         // the SnapshotSerializer to try to de-serialize it.
158
159         SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
160
161         try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
162             return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
163         }
164     }
165
166     @Override
167     public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
168         LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
169
170         return Futures.future(() -> doSave(metadata, snapshot), executionContext);
171     }
172
173     private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
174         final File actual = toSnapshotFile(metadata);
175         final File temp = File.createTempFile(actual.getName(), null, snapshotDir);
176
177         LOG.debug("Saving to temp file: {}", temp);
178
179         try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
180             out.writeObject(snapshot);
181         } catch (IOException e) {
182             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
183             if (!temp.delete()) {
184                 LOG.error("Failed to successfully delete file {}", temp);
185             }
186             throw e;
187         }
188
189         LOG.debug("Renaming to: {}", actual);
190         try {
191             Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
192         } catch (IOException e) {
193             LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
194             if (!temp.delete()) {
195                 LOG.error("Failed to successfully delete file {}", temp);
196             }
197             throw e;
198         }
199
200         return null;
201     }
202
203     @Override
204     public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
205         LOG.debug("In doDeleteAsync - metadata: {}", metadata);
206
207         // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
208         // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
209         // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
210
211         return Futures.future(() -> doDelete(metadata), executionContext);
212     }
213
214     @Override
215     public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
216         LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
217
218         return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
219     }
220
221     private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
222         final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
223                 .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList());
224
225         LOG.debug("Deleting files: {}", files);
226
227         files.forEach(file -> {
228             try {
229                 Files.delete(file.toPath());
230             } catch (IOException | SecurityException e) {
231                 LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
232             }
233         });
234         return null;
235     }
236
237     private Void doDelete(final SnapshotMetadata metadata) {
238         final Collection<File> files = getSnapshotFiles(metadata);
239
240         LOG.debug("Deleting files: {}", files);
241
242         files.forEach(file -> {
243             try {
244                 Files.delete(file.toPath());
245             } catch (IOException | SecurityException e) {
246                 LOG.error("Unable to delete snapshot file: {}", file);
247             }
248         });
249         return null;
250     }
251
252     private Collection<File> getSnapshotFiles(final String persistenceId) {
253         String encodedPersistenceId = encode(persistenceId);
254
255         File[] files = snapshotDir.listFiles((dir, name) -> {
256             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
257             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
258                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
259         });
260
261         if (files == null) {
262             return Collections.emptyList();
263         }
264
265         if (LOG.isDebugEnabled()) {
266             LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
267                     Arrays.toString(files));
268         }
269
270         return Arrays.asList(files);
271     }
272
273     private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
274         return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
275             SnapshotMetadata possible = extractMetadata(file);
276             return possible != null && possible.sequenceNr() == metadata.sequenceNr()
277                     && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
278         }).collect(Collectors.toList());
279     }
280
281     private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
282             final SnapshotSelectionCriteria criteria) {
283         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
284                 .filter(criteria::matches).collect(Collectors.toList());
285     }
286
287     private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
288         return md != null ? Stream.of(md) : Stream.empty();
289     }
290
291     private static @Nullable SnapshotMetadata extractMetadata(final File file) {
292         String name = file.getName();
293         int sequenceNumberEndIndex = name.lastIndexOf('-');
294         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
295         if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
296             return null;
297         }
298
299         try {
300             // Since the persistenceId is url encoded in the filename, we need
301             // to decode relevant filename's part to obtain persistenceId back
302             String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
303             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
304             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
305             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
306         } catch (NumberFormatException e) {
307             return null;
308         }
309     }
310
311     private File toSnapshotFile(final SnapshotMetadata metadata) {
312         return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()),
313             metadata.sequenceNr(), metadata.timestamp()));
314     }
315
316     private static <T> Collector<T, ?, List<T>> reverse() {
317         return Collectors.collectingAndThen(Collectors.toList(), list -> {
318             Collections.reverse(list);
319             return list;
320         });
321     }
322
323     private static String encode(final String str) {
324         try {
325             return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
326         } catch (UnsupportedEncodingException e) {
327             // Shouldn't happen
328             LOG.warn("Error encoding {}", str, e);
329             return str;
330         }
331     }
332
333     private static String decode(final String str) {
334         try {
335             return URLDecoder.decode(str, StandardCharsets.UTF_8.name());
336         } catch (final UnsupportedEncodingException e) {
337             // Shouldn't happen
338             LOG.warn("Error decoding {}", str, e);
339             return str;
340         }
341     }
342
343     @VisibleForTesting
344     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
345         return (int) (!m1.persistenceId().equals(m2.persistenceId())
346                 ? m1.persistenceId().compareTo(m2.persistenceId()) :
347             m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
348                 m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
349     }
350 }