42c90b80d4df0ca70e6fbb95ba8f5be9b8cadb6b
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11
12 import akka.actor.ExtendedActorSystem;
13 import akka.dispatch.Futures;
14 import akka.persistence.SelectedSnapshot;
15 import akka.persistence.SnapshotMetadata;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.persistence.serialization.Snapshot;
18 import akka.persistence.serialization.SnapshotSerializer;
19 import akka.persistence.snapshot.japi.SnapshotStore;
20 import akka.serialization.JavaSerializer;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.io.ByteStreams;
23 import com.typesafe.config.Config;
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.File;
27 import java.io.FileInputStream;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.ObjectInputStream;
32 import java.io.ObjectOutputStream;
33 import java.io.UnsupportedEncodingException;
34 import java.net.URLDecoder;
35 import java.net.URLEncoder;
36 import java.nio.charset.StandardCharsets;
37 import java.nio.file.Files;
38 import java.nio.file.StandardCopyOption;
39 import java.util.ArrayDeque;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.Deque;
44 import java.util.List;
45 import java.util.Optional;
46 import java.util.concurrent.Callable;
47 import java.util.stream.Collector;
48 import java.util.stream.Collectors;
49 import java.util.stream.Stream;
50 import org.eclipse.jdt.annotation.Nullable;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.ExecutionContext;
54 import scala.concurrent.Future;
55
56 /**
57  * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
58  * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
59  * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
60  * by serializing the data directly to the file.
61  *
62  * @author Thomas Pantelis
63  */
64 public class LocalSnapshotStore extends SnapshotStore {
65     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
66
67     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
68
69     private final ExecutionContext executionContext;
70     private final int maxLoadAttempts;
71     private final File snapshotDir;
72
73     public LocalSnapshotStore(final Config config) {
74         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
75         snapshotDir = new File(config.getString("dir"));
76
77         int localMaxLoadAttempts = config.getInt("max-load-attempts");
78         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
79
80         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
81     }
82
83     @Override
84     public void preStart() throws Exception {
85         if (!snapshotDir.isDirectory()) {
86             // Try to create the directory, on failure double check if someone else beat us to it.
87             if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
88                 throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
89             }
90         }
91
92         super.preStart();
93     }
94
95     @Override
96     public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
97                                                           final SnapshotSelectionCriteria criteria) {
98         LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
99
100         // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
101         // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
102         // will fail but loading an older snapshot may succeed.
103
104         Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
105                 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
106                     .collect(Collectors.toCollection(ArrayDeque::new));
107
108         if (metadatas.isEmpty()) {
109             return Futures.successful(Optional.empty());
110         }
111
112         LOG.debug("doLoadAsync - found: {}", metadatas);
113
114         return Futures.future(() -> doLoad(metadatas), executionContext);
115     }
116
117     private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
118         SnapshotMetadata metadata = metadatas.removeFirst();
119         File file = toSnapshotFile(metadata);
120
121         LOG.debug("doLoad {}", file);
122
123         try {
124             Object data = deserialize(file);
125
126             LOG.debug("deserialized data: {}", data);
127
128             return Optional.of(new SelectedSnapshot(metadata, data));
129         } catch (IOException e) {
130             LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
131
132             if (metadatas.isEmpty()) {
133                 throw e;
134             }
135
136             return doLoad(metadatas);
137         }
138     }
139
140     private Object deserialize(final File file) throws IOException {
141         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
142             (Callable<Object>) () -> {
143                 try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
144                     return in.readObject();
145                 } catch (ClassNotFoundException e) {
146                     throw new IOException("Error loading snapshot file " + file, e);
147                 } catch (IOException e) {
148                     LOG.debug("Error loading snapshot file {}", file, e);
149                     return tryDeserializeAkkaSnapshot(file);
150                 }
151             });
152     }
153
154     private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
155         LOG.debug("tryDeserializeAkkaSnapshot {}", file);
156
157         // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
158         // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
159         // the SnapshotSerializer to try to de-serialize it.
160
161         SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
162
163         try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
164             return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
165         }
166     }
167
168     @Override
169     public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
170         LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
171
172         return Futures.future(() -> doSave(metadata, snapshot), executionContext);
173     }
174
175     private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
176         final File actual = toSnapshotFile(metadata);
177         final File temp = File.createTempFile(actual.getName(), null, snapshotDir);
178
179         LOG.debug("Saving to temp file: {}", temp);
180
181         try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
182             out.writeObject(snapshot);
183         } catch (IOException e) {
184             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
185             if (!temp.delete()) {
186                 LOG.error("Failed to successfully delete file {}", temp);
187             }
188             throw e;
189         }
190
191         LOG.debug("Renaming to: {}", actual);
192         try {
193             Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
194         } catch (IOException e) {
195             LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
196             if (!temp.delete()) {
197                 LOG.error("Failed to successfully delete file {}", temp);
198             }
199             throw e;
200         }
201
202         return null;
203     }
204
205     @Override
206     public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
207         LOG.debug("In doDeleteAsync - metadata: {}", metadata);
208
209         // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
210         // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
211         // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
212
213         return Futures.future(() -> doDelete(metadata), executionContext);
214     }
215
216     @Override
217     public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
218         LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
219
220         return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
221     }
222
223     private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
224         final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
225                 .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList());
226
227         LOG.debug("Deleting files: {}", files);
228
229         files.forEach(file -> {
230             try {
231                 Files.delete(file.toPath());
232             } catch (IOException | SecurityException e) {
233                 LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
234             }
235         });
236         return null;
237     }
238
239     private Void doDelete(final SnapshotMetadata metadata) {
240         final Collection<File> files = getSnapshotFiles(metadata);
241
242         LOG.debug("Deleting files: {}", files);
243
244         files.forEach(file -> {
245             try {
246                 Files.delete(file.toPath());
247             } catch (IOException | SecurityException e) {
248                 LOG.error("Unable to delete snapshot file: {}", file);
249             }
250         });
251         return null;
252     }
253
254     private Collection<File> getSnapshotFiles(final String persistenceId) {
255         String encodedPersistenceId = encode(persistenceId);
256
257         File[] files = snapshotDir.listFiles((dir, name) -> {
258             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
259             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
260                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
261         });
262
263         if (files == null) {
264             return Collections.emptyList();
265         }
266
267         if (LOG.isDebugEnabled()) {
268             LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
269                     Arrays.toString(files));
270         }
271
272         return Arrays.asList(files);
273     }
274
275     private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
276         return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
277             SnapshotMetadata possible = extractMetadata(file);
278             return possible != null && possible.sequenceNr() == metadata.sequenceNr()
279                     && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
280         }).collect(Collectors.toList());
281     }
282
283     private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
284             final SnapshotSelectionCriteria criteria) {
285         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
286                 .filter(criteria::matches).collect(Collectors.toList());
287     }
288
289     private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
290         return md != null ? Stream.of(md) : Stream.empty();
291     }
292
293     private static @Nullable SnapshotMetadata extractMetadata(final File file) {
294         String name = file.getName();
295         int sequenceNumberEndIndex = name.lastIndexOf('-');
296         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
297         if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
298             return null;
299         }
300
301         try {
302             // Since the persistenceId is url encoded in the filename, we need
303             // to decode relevant filename's part to obtain persistenceId back
304             String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
305             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
306             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
307             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
308         } catch (NumberFormatException e) {
309             return null;
310         }
311     }
312
313     private File toSnapshotFile(final SnapshotMetadata metadata) {
314         return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()),
315             metadata.sequenceNr(), metadata.timestamp()));
316     }
317
318     private static <T> Collector<T, ?, List<T>> reverse() {
319         return Collectors.collectingAndThen(Collectors.toList(), list -> {
320             Collections.reverse(list);
321             return list;
322         });
323     }
324
325     private static String encode(final String str) {
326         try {
327             return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
328         } catch (UnsupportedEncodingException e) {
329             // Shouldn't happen
330             LOG.warn("Error encoding {}", str, e);
331             return str;
332         }
333     }
334
335     private static String decode(final String str) {
336         try {
337             return URLDecoder.decode(str, StandardCharsets.UTF_8.name());
338         } catch (final UnsupportedEncodingException e) {
339             // Shouldn't happen
340             LOG.warn("Error decoding {}", str, e);
341             return str;
342         }
343     }
344
345     @VisibleForTesting
346     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
347         checkArgument(m1.persistenceId().equals(m2.persistenceId()),
348                 "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId());
349         final int cmp = Long.compare(m1.timestamp(), m2.timestamp());
350         return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr());
351     }
352 }