1c80b38a2a8d5af927c244eaaaa48cab02825201
[controller.git] /
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.typesafe.config.Config;
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.ObjectInputStream;
17 import java.io.ObjectOutputStream;
18 import java.net.URLDecoder;
19 import java.net.URLEncoder;
20 import java.nio.charset.StandardCharsets;
21 import java.nio.file.Files;
22 import java.nio.file.Path;
23 import java.nio.file.StandardCopyOption;
24 import java.util.ArrayDeque;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Deque;
29 import java.util.List;
30 import java.util.Optional;
31 import java.util.concurrent.Callable;
32 import java.util.stream.Collector;
33 import java.util.stream.Collectors;
34 import java.util.stream.Stream;
35 import org.apache.pekko.actor.ExtendedActorSystem;
36 import org.apache.pekko.dispatch.Futures;
37 import org.apache.pekko.persistence.SelectedSnapshot;
38 import org.apache.pekko.persistence.SnapshotMetadata;
39 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
40 import org.apache.pekko.persistence.serialization.Snapshot;
41 import org.apache.pekko.persistence.serialization.SnapshotSerializer;
42 import org.apache.pekko.persistence.snapshot.japi.SnapshotStore;
43 import org.apache.pekko.serialization.JavaSerializer;
44 import org.eclipse.jdt.annotation.Nullable;
45 import org.opendaylight.raft.spi.FileStreamSource;
46 import org.opendaylight.raft.spi.InputOutputStreamFactory;
47 import org.opendaylight.raft.spi.Lz4BlockSize;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import scala.concurrent.ExecutionContext;
51 import scala.concurrent.Future;
52
53 /**
54  * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
55  * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
56  * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
57  * by serializing the data directly to the file.
58  *
59  * @author Thomas Pantelis
60  */
61 public final class LocalSnapshotStore extends SnapshotStore {
62     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
63     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
64
65     private final InputOutputStreamFactory streamFactory;
66     private final ExecutionContext executionContext;
67     private final int maxLoadAttempts;
68     private final Path snapshotDir;
69
70     public LocalSnapshotStore(final Config config) {
71         executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
72         snapshotDir = Path.of(config.getString("dir"));
73
74         final int localMaxLoadAttempts = config.getInt("max-load-attempts");
75         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
76
77         if (config.getBoolean("use-lz4-compression")) {
78             final var size = config.getString("lz4-blocksize");
79             final var blockSize = switch(size) {
80                 case "64KB" -> Lz4BlockSize.LZ4_64KB;
81                 case "256KB" -> Lz4BlockSize.LZ4_256KB;
82                 case "1MB" -> Lz4BlockSize.LZ4_1MB;
83                 case "4MB" -> Lz4BlockSize.LZ4_4MB;
84                 default -> throw new IllegalArgumentException("Invalid block size '" + size + "'");
85             };
86             streamFactory = InputOutputStreamFactory.lz4(blockSize);
87             LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
88         } else {
89             streamFactory = InputOutputStreamFactory.simple();
90             LOG.debug("Using plain Input/Output Stream");
91         }
92
93         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
94     }
95
96     @Override
97     public void preStart() throws Exception {
98         if (!Files.isDirectory(snapshotDir)) {
99             // Try to create the directory including any non-existing parents
100             Files.createDirectories(snapshotDir);
101         }
102
103         super.preStart();
104     }
105
106     @Override
107     public Future<Optional<SelectedSnapshot>> doLoadAsync(final String persistenceId,
108                                                           final SnapshotSelectionCriteria criteria) {
109         LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
110
111         // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
112         // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
113         // will fail but loading an older snapshot may succeed.
114
115         Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
116                 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
117                     .collect(Collectors.toCollection(ArrayDeque::new));
118
119         if (metadatas.isEmpty()) {
120             return Futures.successful(Optional.empty());
121         }
122
123         LOG.debug("doLoadAsync - found: {}", metadatas);
124
125         return Futures.future(() -> doLoad(metadatas), executionContext);
126     }
127
128     private Optional<SelectedSnapshot> doLoad(final Deque<SnapshotMetadata> metadatas) throws IOException {
129         SnapshotMetadata metadata = metadatas.removeFirst();
130         final var file = toSnapshotFile(metadata);
131
132         LOG.debug("doLoad {}", file);
133
134         try {
135             Object data = deserialize(file);
136
137             LOG.debug("deserialized data: {}", data);
138
139             return Optional.of(new SelectedSnapshot(metadata, data));
140         } catch (IOException e) {
141             LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
142
143             if (metadatas.isEmpty()) {
144                 throw e;
145             }
146
147             return doLoad(metadatas);
148         }
149     }
150
151     private Object deserialize(final Path file) throws IOException {
152         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
153             (Callable<Object>) () -> {
154                 try (var ois = new ObjectInputStream(streamFactory.createInputStream(
155                         new FileStreamSource(file, 0, Files.size(file))))) {
156                     return ois.readObject();
157                 } catch (ClassNotFoundException e) {
158                     throw new IOException("Error loading snapshot file " + file, e);
159                 } catch (IOException e) {
160                     LOG.debug("Error loading snapshot file {}", file, e);
161                     return tryDeserializeAkkaSnapshot(file);
162                 }
163             });
164     }
165
166     private Object tryDeserializeAkkaSnapshot(final Path file) throws IOException {
167         LOG.debug("tryDeserializeAkkaSnapshot {}", file);
168
169         // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
170         // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
171         // the SnapshotSerializer to try to de-serialize it.
172
173         final var snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
174         return ((Snapshot) snapshotSerializer.fromBinary(Files.readAllBytes(file))).data();
175     }
176
177     @Override
178     public Future<Void> doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) {
179         LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
180
181         return Futures.future(() -> doSave(metadata, snapshot), executionContext);
182     }
183
184     private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException {
185         final var actual = toSnapshotFile(metadata).toFile();
186         final var temp = File.createTempFile(actual.getName(), null, snapshotDir.toFile());
187
188         LOG.debug("Saving to temp file: {}", temp);
189
190         try (var out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
191             out.writeObject(snapshot);
192         } catch (IOException e) {
193             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
194             if (!temp.delete()) {
195                 LOG.error("Failed to successfully delete file {}", temp);
196             }
197             throw e;
198         }
199
200         LOG.debug("Renaming to: {}", actual);
201         try {
202             Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE);
203         } catch (IOException e) {
204             LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e);
205             if (!temp.delete()) {
206                 LOG.error("Failed to successfully delete file {}", temp);
207             }
208             throw e;
209         }
210
211         return null;
212     }
213
214     @Override
215     public Future<Void> doDeleteAsync(final SnapshotMetadata metadata) {
216         LOG.debug("In doDeleteAsync - metadata: {}", metadata);
217
218         // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
219         // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
220         // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
221
222         return Futures.future(() -> doDelete(metadata), executionContext);
223     }
224
225     @Override
226     public Future<Void> doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) {
227         LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
228
229         return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
230     }
231
232     private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) {
233         final var files = getSnapshotMetadatas(persistenceId, criteria).stream()
234                 .flatMap(md -> Stream.of(toSnapshotFile(md)))
235                 .collect(Collectors.toList());
236
237         LOG.debug("Deleting files: {}", files);
238
239         files.forEach(file -> {
240             try {
241                 Files.delete(file);
242             } catch (IOException | SecurityException e) {
243                 LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
244             }
245         });
246         return null;
247     }
248
249     private Void doDelete(final SnapshotMetadata metadata) {
250         final Collection<File> files = getSnapshotFiles(metadata);
251
252         LOG.debug("Deleting files: {}", files);
253
254         files.forEach(file -> {
255             try {
256                 Files.delete(file.toPath());
257             } catch (IOException | SecurityException e) {
258                 LOG.error("Unable to delete snapshot file: {}", file);
259             }
260         });
261         return null;
262     }
263
264     private Collection<File> getSnapshotFiles(final String persistenceId) {
265         String encodedPersistenceId = encode(persistenceId);
266
267         final var files = snapshotDir.toFile().listFiles((dir, name) -> {
268             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
269             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
270                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
271         });
272
273         if (files == null) {
274             return Collections.emptyList();
275         }
276
277         if (LOG.isDebugEnabled()) {
278             LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
279                     Arrays.toString(files));
280         }
281
282         return Arrays.asList(files);
283     }
284
285     private Collection<File> getSnapshotFiles(final SnapshotMetadata metadata) {
286         return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
287             SnapshotMetadata possible = extractMetadata(file);
288             return possible != null && possible.sequenceNr() == metadata.sequenceNr()
289                     && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
290         }).collect(Collectors.toList());
291     }
292
293     private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
294             final SnapshotSelectionCriteria criteria) {
295         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
296                 .filter(criteria::matches).collect(Collectors.toList());
297     }
298
299     private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
300         return md != null ? Stream.of(md) : Stream.empty();
301     }
302
303     private static @Nullable SnapshotMetadata extractMetadata(final File file) {
304         String name = file.getName();
305         int sequenceNumberEndIndex = name.lastIndexOf('-');
306         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
307         if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
308             return null;
309         }
310
311         try {
312             // Since the persistenceId is url encoded in the filename, we need
313             // to decode relevant filename's part to obtain persistenceId back
314             String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
315             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
316             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
317             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
318         } catch (NumberFormatException e) {
319             return null;
320         }
321     }
322
323     private Path toSnapshotFile(final SnapshotMetadata metadata) {
324         return snapshotDir.resolve("snapshot-%s-%d-%d".formatted(encode(metadata.persistenceId()),
325             metadata.sequenceNr(), metadata.timestamp()));
326     }
327
328     private static <T> Collector<T, ?, List<T>> reverse() {
329         return Collectors.collectingAndThen(Collectors.toList(), list -> {
330             Collections.reverse(list);
331             return list;
332         });
333     }
334
335     private static String encode(final String str) {
336         return URLEncoder.encode(str, StandardCharsets.UTF_8);
337     }
338
339     private static String decode(final String str) {
340         return URLDecoder.decode(str, StandardCharsets.UTF_8);
341     }
342
343     @VisibleForTesting
344     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
345         checkArgument(m1.persistenceId().equals(m2.persistenceId()),
346                 "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId());
347         final int cmp = Long.compare(m1.timestamp(), m2.timestamp());
348         return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr());
349     }
350 }