38249dabd4bee4bb33f356cdfc1b42cb326a6a77
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / FileBackedOutputStream.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.FinalizablePhantomReference;
12 import com.google.common.base.FinalizableReferenceQueue;
13 import com.google.common.collect.Sets;
14 import com.google.common.io.ByteSource;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.file.Files;
23 import java.util.Iterator;
24 import java.util.Set;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import javax.annotation.concurrent.GuardedBy;
28 import javax.annotation.concurrent.ThreadSafe;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
34  * reaches a configurable size. This class is thread-safe.
35  *
36  * @author Thomas Pantelis
37  */
38 @ThreadSafe
39 public class FileBackedOutputStream extends OutputStream {
40     private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
41
42     /**
43      * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
44      * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
45      * itself from this map and thus becomes eligible for garbage collection.
46      */
47     @VisibleForTesting
48     static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
49
50     /**
51      * Used as the ReferenceQueue for the Cleanup PhantomReferences.
52      */
53     private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
54
55     private final int fileThreshold;
56     private final String fileDirectory;
57
58     @GuardedBy("this")
59     private MemoryOutputStream memory = new MemoryOutputStream();
60
61     @GuardedBy("this")
62     private OutputStream out = memory;
63
64     @GuardedBy("this")
65     private File file;
66
67     @GuardedBy("this")
68     private ByteSource source;
69
70     private volatile long count;
71
72     /**
73      * Creates a new instance that uses the given file threshold, and does not reset the data when the
74      * {@link ByteSource} returned by {@link #asByteSource} is finalized.
75      *
76      * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
77      * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
78      *                      location is used.
79      */
80     public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
81         this.fileThreshold = fileThreshold;
82         this.fileDirectory = fileDirectory;
83     }
84
85     /**
86      * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
87      * closed and further attempts to write to it will result in an IOException.
88      *
89      * @return a ByteSource instance
90      * @throws IOException if close fails
91      */
92     @Nonnull
93     public synchronized ByteSource asByteSource() throws IOException {
94         close();
95
96         if (source == null) {
97             source = new ByteSource() {
98                 @Override
99                 public InputStream openStream() throws IOException {
100                     synchronized (FileBackedOutputStream.this) {
101                         if (file != null) {
102                             return Files.newInputStream(file.toPath());
103                         } else {
104                             return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
105                         }
106                     }
107                 }
108
109                 @Override
110                 public long size() {
111                     return count;
112                 }
113             };
114         }
115
116         return source;
117     }
118
119     @Override
120     @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
121         + "increment of count needs to be atomic even though it is inside a synchronized block.")
122     public synchronized void write(int value) throws IOException {
123         possiblySwitchToFile(1);
124         out.write(value);
125         count++;
126     }
127
128     @Override
129     public synchronized void write(byte[] bytes) throws IOException {
130         write(bytes, 0, bytes.length);
131     }
132
133     @Override
134     public synchronized void write(byte[] bytes, int off, int len) throws IOException {
135         possiblySwitchToFile(len);
136         out.write(bytes, off, len);
137         count += len;
138     }
139
140     @Override
141     public synchronized void close() throws IOException {
142         if (out != null) {
143             OutputStream closeMe = out;
144             out = null;
145             closeMe.close();
146         }
147     }
148
149     @Override
150     public synchronized void flush() throws IOException {
151         if (out != null) {
152             out.flush();
153         }
154     }
155
156     public synchronized long getCount() {
157         return count;
158     }
159
160     /**
161      * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
162      */
163     public synchronized void cleanup() {
164         LOG.debug("In cleanup");
165
166         closeQuietly();
167
168         if (file != null) {
169             Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
170             while (iter.hasNext()) {
171                 if (file.equals(iter.next().file)) {
172                     iter.remove();
173                     break;
174                 }
175             }
176
177             LOG.debug("cleanup - deleting temp file {}", file);
178
179             deleteFile(file);
180             file = null;
181         }
182     }
183
184     @GuardedBy("this")
185     private void closeQuietly() {
186         try {
187             close();
188         } catch (IOException e) {
189             LOG.warn("Error closing output stream {}", out, e);
190         }
191     }
192
193     /**
194      * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
195      */
196     @GuardedBy("this")
197     private void possiblySwitchToFile(int len) throws IOException {
198         if (out == null) {
199             throw new IOException("Stream already closed");
200         }
201
202         if (file == null && memory.getCount() + len > fileThreshold) {
203             File temp = File.createTempFile("FileBackedOutputStream", null,
204                     fileDirectory == null ? null : new File(fileDirectory));
205             temp.deleteOnExit();
206
207             LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
208                     fileThreshold, temp);
209
210             OutputStream transfer = null;
211             try {
212                 transfer = Files.newOutputStream(temp.toPath());
213                 transfer.write(memory.getBuffer(), 0, memory.getCount());
214                 transfer.flush();
215
216                 // We've successfully transferred the data; switch to writing to file
217                 out = transfer;
218                 file = temp;
219                 memory = null;
220
221                 new Cleanup(this, file);
222             } catch (IOException e) {
223                 if (transfer != null) {
224                     try {
225                         transfer.close();
226                     } catch (IOException ex) {
227                         LOG.debug("Error closing temp file {}", temp, ex);
228                     }
229                 }
230
231                 deleteFile(temp);
232                 throw e;
233             }
234         }
235     }
236
237     private static void deleteFile(File file) {
238         if (!file.delete()) {
239             LOG.warn("Could not delete temp file {}", file);
240         }
241     }
242
243     /**
244      * ByteArrayOutputStream that exposes its internals for efficiency.
245      */
246     private static class MemoryOutputStream extends ByteArrayOutputStream {
247         byte[] getBuffer() {
248             return buf;
249         }
250
251         int getCount() {
252             return count;
253         }
254     }
255
256     /**
257      * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
258      */
259     private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
260         private final File file;
261
262         Cleanup(FileBackedOutputStream referent, File file) {
263             super(referent, REFERENCE_QUEUE);
264             this.file = file;
265
266             REFERENCE_CACHE.add(this);
267
268             LOG.debug("Added Cleanup for temp file {}", file);
269         }
270
271         @Override
272         public void finalizeReferent() {
273             LOG.debug("In finalizeReferent");
274
275             if (REFERENCE_CACHE.remove(this)) {
276                 LOG.debug("finalizeReferent - deleting temp file {}", file);
277                 deleteFile(file);
278             }
279         }
280     }
281 }