Fix some sonar/checkstyle issues
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import akka.actor.ExtendedActorSystem;
11 import akka.dispatch.Futures;
12 import akka.persistence.SelectedSnapshot;
13 import akka.persistence.SnapshotMetadata;
14 import akka.persistence.SnapshotSelectionCriteria;
15 import akka.persistence.serialization.Snapshot;
16 import akka.persistence.serialization.SnapshotSerializer;
17 import akka.persistence.snapshot.japi.SnapshotStore;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.io.ByteStreams;
20 import com.typesafe.config.Config;
21 import java.io.BufferedInputStream;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.FileOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.io.UnsupportedEncodingException;
30 import java.net.URLDecoder;
31 import java.net.URLEncoder;
32 import java.nio.charset.StandardCharsets;
33 import java.nio.file.Files;
34 import java.nio.file.StandardCopyOption;
35 import java.util.ArrayDeque;
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.Deque;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.stream.Collector;
43 import java.util.stream.Collectors;
44 import java.util.stream.Stream;
45 import javax.annotation.Nullable;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.ExecutionContext;
49 import scala.concurrent.Future;
50
51 /**
52  * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
53  * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
54  * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
55  * by serializing the data directly to the file.
56  *
57  * @author Thomas Pantelis
58  */
59 public class LocalSnapshotStore extends SnapshotStore {
60     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
61
62     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
63
64     private final ExecutionContext executionContext;
65     private final int maxLoadAttempts;
66     private final File snapshotDir;
67
68     public LocalSnapshotStore(final Config config) {
69         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
70         snapshotDir = new File(config.getString("dir"));
71
72         int localMaxLoadAttempts = config.getInt("max-load-attempts");
73         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
74
75         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
76     }
77
78     @Override
79     public void preStart() throws Exception {
80         if (!snapshotDir.isDirectory()) {
81             // Try to create the directory, on failure double check if someone else beat us to it.
82             if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
83                 throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
84             }
85         }
86
87         super.preStart();
88     }
89
90     @Override
91     public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
92                                                           final SnapshotSelectionCriteria criteria) {
93         LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
94
95         // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
96         // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
97         // will fail but loading an older snapshot may succeed.
98
99         Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
100                 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
101                     .collect(Collectors.toCollection(ArrayDeque::new));
102
103         if (metadatas.isEmpty()) {
104             return Futures.successful(Optional.empty());
105         }
106
107         LOG.debug("doLoadAsync - found: {}", metadatas);
108
109         return Futures.future(() -> doLoad(metadatas), executionContext);
110     }
111
112     private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
113         SnapshotMetadata metadata = metadatas.removeFirst();
114         File file = toSnapshotFile(metadata);
115
116         LOG.debug("doLoad {}", file);
117
118         try {
119             Object data = deserialize(file);
120
121             LOG.debug("deserialized data: {}", data);
122
123             return Optional.of(new SelectedSnapshot(metadata, data));
124         } catch (IOException e) {
125             LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
126
127             if (metadatas.isEmpty()) {
128                 throw e;
129             }
130
131             return doLoad(metadatas);
132         }
133     }
134
135     private Object deserialize(final File file) throws IOException {
136         try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
137             return in.readObject();
138         } catch (ClassNotFoundException e) {
139             throw new IOException("Error loading snapshot file " + file, e);
140         } catch (IOException e) {
141             LOG.debug("Error loading snapshot file {}", file, e);
142
143             return tryDeserializeAkkaSnapshot(file);
144         }
145     }
146
147     private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
148         LOG.debug("tryDeserializeAkkaSnapshot {}", file);
149
150         // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
151         // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
152         // the SnapshotSerializer to try to de-serialize it.
153
154         SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
155
156         try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
157             return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
158         }
159     }
160
161     @Override
162     public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
163         LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
164
165         return Futures.future(() -> doSave(metadata, snapshot), executionContext);
166     }
167
168     private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
169         final File actual = toSnapshotFile(metadata);
170         final File temp = File.createTempFile(actual.getName(), null, snapshotDir);
171
172         LOG.debug("Saving to temp file: {}", temp);
173
174         try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) {
175             out.writeObject(snapshot);
176         } catch (IOException e) {
177             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
178             if (!temp.delete()) {
179                 LOG.error("Failed to successfully delete file {}", temp);
180             }
181             throw e;
182         }
183
184         LOG.debug("Renaming to: {}", actual);
185         try {
186             Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
187         } catch (IOException e) {
188             LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
189             if (!temp.delete()) {
190                 LOG.error("Failed to successfully delete file {}", temp);
191             }
192             throw e;
193         }
194
195         return null;
196     }
197
198     @Override
199     public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
200         LOG.debug("In doDeleteAsync - metadata: {}", metadata);
201
202         // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
203         // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
204         // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
205
206         return Futures.future(() -> doDelete(metadata), executionContext);
207     }
208
209     @Override
210     public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
211         LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
212
213         return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
214     }
215
216     private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
217         final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
218                 .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList());
219
220         LOG.debug("Deleting files: {}", files);
221
222         files.forEach(File::delete);
223         return null;
224     }
225
226     private Void doDelete(final SnapshotMetadata metadata) {
227         final Collection<File> files = getSnapshotFiles(metadata);
228
229         LOG.debug("Deleting files: {}", files);
230
231         files.forEach(File::delete);
232         return null;
233     }
234
235     private Collection<File> getSnapshotFiles(final String persistenceId) {
236         String encodedPersistenceId = encode(persistenceId);
237
238         File[] files = snapshotDir.listFiles((dir, name) -> {
239             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
240             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
241                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
242         });
243
244         if (files == null) {
245             return Collections.emptyList();
246         }
247
248         if (LOG.isDebugEnabled()) {
249             LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
250                     Arrays.toString(files));
251         }
252
253         return Arrays.asList(files);
254     }
255
256     private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
257         return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
258             SnapshotMetadata possible = extractMetadata(file);
259             return possible != null && possible.sequenceNr() == metadata.sequenceNr()
260                     && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
261         }).collect(Collectors.toList());
262     }
263
264     private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
265             final SnapshotSelectionCriteria criteria) {
266         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
267                 .filter(criteria::matches).collect(Collectors.toList());
268     }
269
270     private static Stream<SnapshotMetadata> toStream(@Nullable final SnapshotMetadata md) {
271         return md != null ? Stream.of(md) : Stream.empty();
272     }
273
274     @Nullable
275     private static SnapshotMetadata extractMetadata(final File file) {
276         String name = file.getName();
277         int sequenceNumberEndIndex = name.lastIndexOf('-');
278         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
279         if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
280             return null;
281         }
282
283         try {
284             // Since the persistenceId is url encoded in the filename, we need
285             // to decode relevant filename's part to obtain persistenceId back
286             String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
287             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
288             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
289             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
290         } catch (NumberFormatException e) {
291             return null;
292         }
293     }
294
295     private File toSnapshotFile(final SnapshotMetadata metadata) {
296         return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()),
297             metadata.sequenceNr(), metadata.timestamp()));
298     }
299
300     private static <T> Collector<T, ?, List<T>> reverse() {
301         return Collectors.collectingAndThen(Collectors.toList(), list -> {
302             Collections.reverse(list);
303             return list;
304         });
305     }
306
307     private static String encode(final String str) {
308         try {
309             return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
310         } catch (UnsupportedEncodingException e) {
311             // Shouldn't happen
312             LOG.warn("Error encoding {}", str, e);
313             return str;
314         }
315     }
316
317     private static String decode(final String str) {
318         try {
319             return URLDecoder.decode(str, StandardCharsets.UTF_8.name());
320         } catch (final UnsupportedEncodingException e) {
321             // Shouldn't happen
322             LOG.warn("Error decoding {}", str, e);
323             return str;
324         }
325     }
326
327     @VisibleForTesting
328     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
329         return (int) (!m1.persistenceId().equals(m2.persistenceId())
330                 ? m1.persistenceId().compareTo(m2.persistenceId()) :
331             m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
332                 m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
333     }
334 }