Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / FileBackedOutputStream.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.FinalizablePhantomReference;
12 import com.google.common.base.FinalizableReferenceQueue;
13 import com.google.common.collect.Sets;
14 import com.google.common.io.ByteSource;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.file.Files;
23 import java.util.Iterator;
24 import java.util.Set;
25 import javax.annotation.concurrent.GuardedBy;
26 import javax.annotation.concurrent.ThreadSafe;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
34  * reaches a configurable size. This class is thread-safe.
35  *
36  * @author Thomas Pantelis
37  */
38 @ThreadSafe
39 public class FileBackedOutputStream extends OutputStream {
40     private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
41
42     /**
43      * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
44      * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
45      * itself from this map and thus becomes eligible for garbage collection.
46      */
47     @VisibleForTesting
48     static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
49
50     /**
51      * Used as the ReferenceQueue for the Cleanup PhantomReferences.
52      */
53     private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
54
55     private final int fileThreshold;
56     private final String fileDirectory;
57
58     @GuardedBy("this")
59     private MemoryOutputStream memory = new MemoryOutputStream();
60
61     @GuardedBy("this")
62     private OutputStream out = memory;
63
64     @GuardedBy("this")
65     private File file;
66
67     @GuardedBy("this")
68     private ByteSource source;
69
70     private volatile long count;
71
72     /**
73      * Creates a new instance that uses the given file threshold, and does not reset the data when the
74      * {@link ByteSource} returned by {@link #asByteSource} is finalized.
75      *
76      * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
77      * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
78      *                      location is used.
79      */
80     public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
81         this.fileThreshold = fileThreshold;
82         this.fileDirectory = fileDirectory;
83     }
84
85     /**
86      * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
87      * closed and further attempts to write to it will result in an IOException.
88      *
89      * @return a ByteSource instance
90      * @throws IOException if close fails
91      */
92     public synchronized @NonNull ByteSource asByteSource() throws IOException {
93         close();
94
95         if (source == null) {
96             source = new ByteSource() {
97                 @Override
98                 public InputStream openStream() throws IOException {
99                     synchronized (FileBackedOutputStream.this) {
100                         if (file != null) {
101                             return Files.newInputStream(file.toPath());
102                         } else {
103                             return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
104                         }
105                     }
106                 }
107
108                 @Override
109                 public long size() {
110                     return count;
111                 }
112             };
113         }
114
115         return source;
116     }
117
118     @Override
119     @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
120         + "increment of count needs to be atomic even though it is inside a synchronized block.")
121     public synchronized void write(int value) throws IOException {
122         possiblySwitchToFile(1);
123         out.write(value);
124         count++;
125     }
126
127     @Override
128     public synchronized void write(byte[] bytes) throws IOException {
129         write(bytes, 0, bytes.length);
130     }
131
132     @Override
133     public synchronized void write(byte[] bytes, int off, int len) throws IOException {
134         possiblySwitchToFile(len);
135         out.write(bytes, off, len);
136         count += len;
137     }
138
139     @Override
140     public synchronized void close() throws IOException {
141         if (out != null) {
142             OutputStream closeMe = out;
143             out = null;
144             closeMe.close();
145         }
146     }
147
148     @Override
149     public synchronized void flush() throws IOException {
150         if (out != null) {
151             out.flush();
152         }
153     }
154
155     public synchronized long getCount() {
156         return count;
157     }
158
159     /**
160      * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
161      */
162     public synchronized void cleanup() {
163         LOG.debug("In cleanup");
164
165         closeQuietly();
166
167         if (file != null) {
168             Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
169             while (iter.hasNext()) {
170                 if (file.equals(iter.next().file)) {
171                     iter.remove();
172                     break;
173                 }
174             }
175
176             LOG.debug("cleanup - deleting temp file {}", file);
177
178             deleteFile(file);
179             file = null;
180         }
181     }
182
183     @GuardedBy("this")
184     private void closeQuietly() {
185         try {
186             close();
187         } catch (IOException e) {
188             LOG.warn("Error closing output stream {}", out, e);
189         }
190     }
191
192     /**
193      * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
194      */
195     @GuardedBy("this")
196     private void possiblySwitchToFile(int len) throws IOException {
197         if (out == null) {
198             throw new IOException("Stream already closed");
199         }
200
201         if (file == null && memory.getCount() + len > fileThreshold) {
202             File temp = File.createTempFile("FileBackedOutputStream", null,
203                     fileDirectory == null ? null : new File(fileDirectory));
204             temp.deleteOnExit();
205
206             LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
207                     fileThreshold, temp);
208
209             OutputStream transfer = null;
210             try {
211                 transfer = Files.newOutputStream(temp.toPath());
212                 transfer.write(memory.getBuffer(), 0, memory.getCount());
213                 transfer.flush();
214
215                 // We've successfully transferred the data; switch to writing to file
216                 out = transfer;
217                 file = temp;
218                 memory = null;
219
220                 new Cleanup(this, file);
221             } catch (IOException e) {
222                 if (transfer != null) {
223                     try {
224                         transfer.close();
225                     } catch (IOException ex) {
226                         LOG.debug("Error closing temp file {}", temp, ex);
227                     }
228                 }
229
230                 deleteFile(temp);
231                 throw e;
232             }
233         }
234     }
235
236     private static void deleteFile(File file) {
237         if (!file.delete()) {
238             LOG.warn("Could not delete temp file {}", file);
239         }
240     }
241
242     /**
243      * ByteArrayOutputStream that exposes its internals for efficiency.
244      */
245     private static class MemoryOutputStream extends ByteArrayOutputStream {
246         byte[] getBuffer() {
247             return buf;
248         }
249
250         int getCount() {
251             return count;
252         }
253     }
254
255     /**
256      * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
257      */
258     private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
259         private final File file;
260
261         Cleanup(FileBackedOutputStream referent, File file) {
262             super(referent, REFERENCE_QUEUE);
263             this.file = file;
264
265             REFERENCE_CACHE.add(this);
266
267             LOG.debug("Added Cleanup for temp file {}", file);
268         }
269
270         @Override
271         public void finalizeReferent() {
272             LOG.debug("In finalizeReferent");
273
274             if (REFERENCE_CACHE.remove(this)) {
275                 LOG.debug("finalizeReferent - deleting temp file {}", file);
276                 deleteFile(file);
277             }
278         }
279     }
280 }