457736d5c4a458bc766ccf6f6f39d56ef2ab12bf
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import akka.actor.ExtendedActorSystem;
11 import akka.dispatch.Futures;
12 import akka.persistence.SelectedSnapshot;
13 import akka.persistence.SnapshotMetadata;
14 import akka.persistence.SnapshotSelectionCriteria;
15 import akka.persistence.serialization.Snapshot;
16 import akka.persistence.serialization.SnapshotSerializer;
17 import akka.persistence.snapshot.japi.SnapshotStore;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.io.ByteStreams;
20 import com.typesafe.config.Config;
21 import java.io.BufferedInputStream;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.FileOutputStream;
25 import java.io.FilenameFilter;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.io.UnsupportedEncodingException;
31 import java.net.URLEncoder;
32 import java.nio.charset.StandardCharsets;
33 import java.util.ArrayDeque;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.Deque;
38 import java.util.List;
39 import java.util.Optional;
40 import java.util.stream.Collector;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.annotation.Nullable;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import scala.concurrent.ExecutionContext;
47 import scala.concurrent.Future;
48
49 /**
50  * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
51  * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
52  * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
53  * by serializing the data directly to the file.
54  *
55  * @author Thomas Pantelis
56  */
57 public class LocalSnapshotStore extends SnapshotStore {
58     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
59
60     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
61
62     private final ExecutionContext executionContext;
63     private final int maxLoadAttempts;
64     private final File snapshotDir;
65
66     public LocalSnapshotStore(Config config) {
67         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
68         snapshotDir = new File(config.getString("dir"));
69
70         int localMaxLoadAttempts = config.getInt("max-load-attempts");
71         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
72
73         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
74     }
75
76     @Override
77     public void preStart() throws Exception {
78         if (!snapshotDir.isDirectory()) {
79             // Try to create the directory, on failure double check if someone else beat us to it.
80             if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
81                 throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
82             }
83         }
84
85         super.preStart();
86     }
87
88     @Override
89     public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
90         LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
91
92         // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
93         // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
94         // will fail but loading an older snapshot may succeed.
95
96         Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
97                 .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
98                     .collect(Collectors.toCollection(ArrayDeque::new));
99
100         if (metadatas.isEmpty()) {
101             return Futures.successful(Optional.empty());
102         }
103
104         LOG.debug("doLoadAsync - found: {}", metadatas);
105
106         return Futures.future(() -> doLoad(metadatas), executionContext);
107     }
108
109     private Optional<SelectedSnapshot> doLoad(Deque<SnapshotMetadata> metadatas) throws IOException {
110         SnapshotMetadata metadata = metadatas.removeFirst();
111         File file = toSnapshotFile(metadata, "");
112
113         LOG.debug("doLoad {}", file);
114
115         try {
116             Object data = deserialize(file);
117
118             LOG.debug("deserialized data: {}", data);
119
120             return Optional.of(new SelectedSnapshot(metadata, data));
121         } catch (IOException e) {
122             LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
123
124             if (metadatas.isEmpty()) {
125                 throw e;
126             }
127
128             return doLoad(metadatas);
129         }
130     }
131
132     private Object deserialize(File file) throws IOException {
133         try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
134             return in.readObject();
135         } catch (ClassNotFoundException e) {
136             throw new IOException("Error loading snapshot file " + file, e);
137         } catch (IOException e) {
138             LOG.debug("Error loading snapshot file {}", file, e);
139
140             return tryDeserializeAkkaSnapshot(file);
141         }
142     }
143
144     private Object tryDeserializeAkkaSnapshot(File file) throws IOException {
145         LOG.debug("tryDeserializeAkkaSnapshot {}", file);
146
147         // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
148         // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
149         // the SnapshotSerializer to try to de-serialize it.
150
151         SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
152
153         try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
154             return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
155         }
156     }
157
158     @Override
159     public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
160         LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
161
162         return Futures.future(() -> doSave(metadata, snapshot), executionContext);
163     }
164
165     private Void doSave(SnapshotMetadata metadata, Object snapshot) throws IOException {
166         File temp = toSnapshotFile(metadata, ".tmp");
167
168         LOG.debug("Saving to temp file: {}", temp);
169
170         try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) {
171             out.writeObject(snapshot);
172         } catch (IOException e) {
173             LOG.error("Error saving snapshot file {}", temp, e);
174             throw e;
175         }
176
177         File actual = toSnapshotFile(metadata, "");
178
179         LOG.debug("Renaming to: {}", actual);
180
181         if (!temp.renameTo(actual)) {
182             throw new IOException(String.format("Failed to rename %s to %s", temp, actual));
183         }
184
185         return null;
186     }
187
188     @Override
189     public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
190         LOG.debug("In doDeleteAsync - metadata: {}", metadata);
191
192         // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
193         // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
194         // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
195
196         return Futures.future(() -> doDelete(metadata), executionContext);
197     }
198
199     @Override
200     public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
201         LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
202
203         return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
204     }
205
206     private Void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) {
207         final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
208                 .flatMap(md -> Stream.of(toSnapshotFile(md, ""))).collect(Collectors.toList());
209
210         LOG.debug("Deleting files: {}", files);
211
212         files.forEach(file -> file.delete());
213         return null;
214     }
215
216     private Void doDelete(SnapshotMetadata metadata) {
217         final Collection<File> files = getSnapshotFiles(metadata);
218
219         LOG.debug("Deleting files: {}", files);
220
221         files.forEach(file -> file.delete());
222         return null;
223     }
224
225     private Collection<File> getSnapshotFiles(String persistenceId) {
226         String encodedPersistenceId = encode(persistenceId);
227
228         File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
229             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
230             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
231                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
232         });
233
234         if (files == null) {
235             return Collections.emptyList();
236         }
237
238         if (LOG.isDebugEnabled()) {
239             LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
240                     Arrays.toString(files));
241         }
242
243         return Arrays.asList(files);
244     }
245
246     private Collection<File> getSnapshotFiles(SnapshotMetadata metadata) {
247         return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
248             SnapshotMetadata possible = extractMetadata(file);
249             return possible != null && possible.sequenceNr() == metadata.sequenceNr()
250                     && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
251         }).collect(Collectors.toList());
252     }
253
254     private Collection<SnapshotMetadata> getSnapshotMetadatas(String persistenceId,
255             SnapshotSelectionCriteria criteria) {
256         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
257                 .filter(md -> criteria.matches(md)).collect(Collectors.toList());
258     }
259
260     private static Stream<SnapshotMetadata> toStream(@Nullable SnapshotMetadata md) {
261         return md != null ? Stream.of(md) : Stream.empty();
262     }
263
264     @Nullable
265     private static SnapshotMetadata extractMetadata(File file) {
266         String name = file.getName();
267         int sequenceNumberEndIndex = name.lastIndexOf('-');
268         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
269         if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
270             return null;
271         }
272
273         try {
274             String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex);
275             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
276             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
277             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
278         } catch (NumberFormatException e) {
279             return null;
280         }
281     }
282
283     private File toSnapshotFile(SnapshotMetadata metadata, String extension) {
284         return new File(snapshotDir, String.format("snapshot-%s-%d-%d%s", encode(metadata.persistenceId()),
285                 metadata.sequenceNr(), metadata.timestamp(), extension));
286     }
287
288     private static <T> Collector<T, ?, List<T>> reverse() {
289         return Collectors.collectingAndThen(Collectors.toList(), list -> {
290             Collections.reverse(list);
291             return list;
292         });
293     }
294
295     private String encode(String str) {
296         try {
297             return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
298         } catch (UnsupportedEncodingException e) {
299             // Shouldn't happen
300             LOG.warn("Error encoding {}", str, e);
301             return str;
302         }
303     }
304
305     @VisibleForTesting
306     static int compare(SnapshotMetadata m1, SnapshotMetadata m2) {
307         return (int) (!m1.persistenceId().equals(m2.persistenceId())
308                 ? m1.persistenceId().compareTo(m2.persistenceId()) :
309             m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
310                 m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
311     }
312 }