Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / FileBackedOutputStream.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.FinalizablePhantomReference;
12 import com.google.common.base.FinalizableReferenceQueue;
13 import com.google.common.collect.Sets;
14 import com.google.common.io.ByteSource;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.File;
19 import java.io.FileInputStream;
20 import java.io.FileOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Iterator;
25 import java.util.Set;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
35  * reaches a configurable size. This class is thread-safe.
36  *
37  * @author Thomas Pantelis
38  */
39 @ThreadSafe
40 public class FileBackedOutputStream extends OutputStream {
41     private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
42
43     /**
44      * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
45      * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
46      * itself from this map and thus becomes eligible for garbage collection.
47      */
48     @VisibleForTesting
49     static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
50
51     /**
52      * Used as the ReferenceQueue for the Cleanup PhantomReferences.
53      */
54     private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
55
56     private final int fileThreshold;
57     private final String fileDirectory;
58
59     @GuardedBy("this")
60     private MemoryOutputStream memory = new MemoryOutputStream();
61
62     @GuardedBy("this")
63     private OutputStream out = memory;
64
65     @GuardedBy("this")
66     private File file;
67
68     @GuardedBy("this")
69     private ByteSource source;
70
71     private volatile long count;
72
73     /**
74      * Creates a new instance that uses the given file threshold, and does not reset the data when the
75      * {@link ByteSource} returned by {@link #asByteSource} is finalized.
76      *
77      * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
78      * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
79      *                      location is used.
80      */
81     public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
82         this.fileThreshold = fileThreshold;
83         this.fileDirectory = fileDirectory;
84     }
85
86     /**
87      * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
88      * closed and further attempts to write to it will result in an IOException.
89      *
90      * @return a ByteSource instance
91      * @throws IOException if close fails
92      */
93     @Nonnull
94     public synchronized ByteSource asByteSource() throws IOException {
95         close();
96
97         if (source == null) {
98             source = new ByteSource() {
99                 @Override
100                 public InputStream openStream() throws IOException {
101                     synchronized (FileBackedOutputStream.this) {
102                         if (file != null) {
103                             return new FileInputStream(file);
104                         } else {
105                             return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
106                         }
107                     }
108                 }
109
110                 @Override
111                 public long size() {
112                     return count;
113                 }
114             };
115         }
116
117         return source;
118     }
119
120     @Override
121     @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
122         + "increment of count needs to be atomic even though it is inside a synchronized block.")
123     public synchronized void write(int value) throws IOException {
124         possiblySwitchToFile(1);
125         out.write(value);
126         count++;
127     }
128
129     @Override
130     public synchronized void write(byte[] bytes) throws IOException {
131         write(bytes, 0, bytes.length);
132     }
133
134     @Override
135     public synchronized void write(byte[] bytes, int off, int len) throws IOException {
136         possiblySwitchToFile(len);
137         out.write(bytes, off, len);
138         count += len;
139     }
140
141     @Override
142     public synchronized void close() throws IOException {
143         if (out != null) {
144             OutputStream closeMe = out;
145             out = null;
146             closeMe.close();
147         }
148     }
149
150     @Override
151     public synchronized void flush() throws IOException {
152         if (out != null) {
153             out.flush();
154         }
155     }
156
157     public synchronized long getCount() {
158         return count;
159     }
160
161     /**
162      * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
163      */
164     public synchronized void cleanup() {
165         LOG.debug("In cleanup");
166
167         closeQuietly();
168
169         if (file != null) {
170             Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
171             while (iter.hasNext()) {
172                 if (file.equals(iter.next().file)) {
173                     iter.remove();
174                     break;
175                 }
176             }
177
178             LOG.debug("cleanup - deleting temp file {}", file);
179
180             deleteFile(file);
181             file = null;
182         }
183     }
184
185     @GuardedBy("this")
186     private void closeQuietly() {
187         try {
188             close();
189         } catch (IOException e) {
190             LOG.warn("Error closing output stream {}", out, e);
191         }
192     }
193
194     /**
195      * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
196      */
197     @GuardedBy("this")
198     private void possiblySwitchToFile(int len) throws IOException {
199         if (out == null) {
200             throw new IOException("Stream already closed");
201         }
202
203         if (file == null && memory.getCount() + len > fileThreshold) {
204             File temp = File.createTempFile("FileBackedOutputStream", null, new File(fileDirectory));
205             temp.deleteOnExit();
206
207             LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
208                     fileThreshold, temp);
209
210             FileOutputStream transfer = null;
211             try {
212                 transfer = new FileOutputStream(temp);
213                 transfer.write(memory.getBuffer(), 0, memory.getCount());
214                 transfer.flush();
215
216                 // We've successfully transferred the data; switch to writing to file
217                 out = transfer;
218                 file = temp;
219                 memory = null;
220
221                 new Cleanup(this, file);
222             } catch (IOException e) {
223                 if (transfer != null) {
224                     try {
225                         transfer.close();
226                     } catch (IOException ex) {
227                         LOG.debug("Error closing temp file {}", temp, ex);
228                     }
229                 }
230
231                 deleteFile(temp);
232                 throw e;
233             }
234         }
235     }
236
237     private static void deleteFile(File file) {
238         if (!file.delete()) {
239             LOG.warn("Could not delete temp file {}", file);
240         }
241     }
242
243     /**
244      * ByteArrayOutputStream that exposes its internals for efficiency.
245      */
246     private static class MemoryOutputStream extends ByteArrayOutputStream {
247         byte[] getBuffer() {
248             return buf;
249         }
250
251         int getCount() {
252             return count;
253         }
254     }
255
256     /**
257      * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
258      */
259     private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
260         private final File file;
261
262         Cleanup(FileBackedOutputStream referent, File file) {
263             super(referent, REFERENCE_QUEUE);
264             this.file = file;
265
266             REFERENCE_CACHE.add(this);
267
268             LOG.debug("Added Cleanup for temp file {}", file);
269         }
270
271         @Override
272         public void finalizeReferent() {
273             LOG.debug("In finalizeReferent");
274
275             if (REFERENCE_CACHE.remove(this)) {
276                 LOG.debug("finalizeReferent - deleting temp file {}", file);
277                 deleteFile(file);
278             }
279         }
280     }
281 }