970b06f5c2fc3d4995100920efc0227d7ce8cd1c
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / FileBackedOutputStream.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.FinalizablePhantomReference;
12 import com.google.common.base.FinalizableReferenceQueue;
13 import com.google.common.collect.Sets;
14 import com.google.common.io.ByteSource;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.file.Files;
23 import java.util.Iterator;
24 import java.util.Set;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
34  * reaches a configurable size. This class is thread-safe.
35  *
36  * @author Thomas Pantelis
37  */
38 public class FileBackedOutputStream extends OutputStream {
39     private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
40
41     /**
42      * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
43      * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
44      * itself from this map and thus becomes eligible for garbage collection.
45      */
46     @VisibleForTesting
47     static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
48
49     /**
50      * Used as the ReferenceQueue for the Cleanup PhantomReferences.
51      */
52     private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
53
54     private final int fileThreshold;
55     private final String fileDirectory;
56
57     @GuardedBy("this")
58     private MemoryOutputStream memory = new MemoryOutputStream();
59
60     @GuardedBy("this")
61     private OutputStream out = memory;
62
63     @GuardedBy("this")
64     private File file;
65
66     @GuardedBy("this")
67     private ByteSource source;
68
69     private volatile long count;
70
71     /**
72      * Creates a new instance that uses the given file threshold, and does not reset the data when the
73      * {@link ByteSource} returned by {@link #asByteSource} is finalized.
74      *
75      * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
76      * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
77      *                      location is used.
78      */
79     public FileBackedOutputStream(final int fileThreshold, @Nullable final String fileDirectory) {
80         this.fileThreshold = fileThreshold;
81         this.fileDirectory = fileDirectory;
82     }
83
84     /**
85      * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
86      * closed and further attempts to write to it will result in an IOException.
87      *
88      * @return a ByteSource instance
89      * @throws IOException if close fails
90      */
91     public synchronized @NonNull ByteSource asByteSource() throws IOException {
92         close();
93
94         if (source == null) {
95             source = new ByteSource() {
96                 @Override
97                 public InputStream openStream() throws IOException {
98                     synchronized (FileBackedOutputStream.this) {
99                         if (file != null) {
100                             return Files.newInputStream(file.toPath());
101                         } else {
102                             return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
103                         }
104                     }
105                 }
106
107                 @Override
108                 public long size() {
109                     return count;
110                 }
111             };
112         }
113
114         return source;
115     }
116
117     @Override
118     @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
119         + "increment of count needs to be atomic even though it is inside a synchronized block.")
120     public synchronized void write(final int value) throws IOException {
121         possiblySwitchToFile(1);
122         out.write(value);
123         count++;
124     }
125
126     @Override
127     public synchronized void write(final byte[] bytes) throws IOException {
128         write(bytes, 0, bytes.length);
129     }
130
131     @Override
132     public synchronized void write(final byte[] bytes, final int off, final int len) throws IOException {
133         possiblySwitchToFile(len);
134         out.write(bytes, off, len);
135         count += len;
136     }
137
138     @Override
139     public synchronized void close() throws IOException {
140         if (out != null) {
141             OutputStream closeMe = out;
142             out = null;
143             closeMe.close();
144         }
145     }
146
147     @Override
148     public synchronized void flush() throws IOException {
149         if (out != null) {
150             out.flush();
151         }
152     }
153
154     public synchronized long getCount() {
155         return count;
156     }
157
158     /**
159      * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
160      */
161     public synchronized void cleanup() {
162         LOG.debug("In cleanup");
163
164         closeQuietly();
165
166         if (file != null) {
167             Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
168             while (iter.hasNext()) {
169                 if (file.equals(iter.next().file)) {
170                     iter.remove();
171                     break;
172                 }
173             }
174
175             LOG.debug("cleanup - deleting temp file {}", file);
176
177             deleteFile(file);
178             file = null;
179         }
180     }
181
182     @Holding("this")
183     private void closeQuietly() {
184         try {
185             close();
186         } catch (IOException e) {
187             LOG.warn("Error closing output stream {}", out, e);
188         }
189     }
190
191     /**
192      * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
193      */
194     @Holding("this")
195     private void possiblySwitchToFile(final int len) throws IOException {
196         if (out == null) {
197             throw new IOException("Stream already closed");
198         }
199
200         if (file == null && memory.getCount() + len > fileThreshold) {
201             File temp = File.createTempFile("FileBackedOutputStream", null,
202                     fileDirectory == null ? null : new File(fileDirectory));
203             temp.deleteOnExit();
204
205             LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
206                     fileThreshold, temp);
207
208             OutputStream transfer = null;
209             try {
210                 transfer = Files.newOutputStream(temp.toPath());
211                 transfer.write(memory.getBuffer(), 0, memory.getCount());
212                 transfer.flush();
213
214                 // We've successfully transferred the data; switch to writing to file
215                 out = transfer;
216                 file = temp;
217                 memory = null;
218
219                 new Cleanup(this, file);
220             } catch (IOException e) {
221                 if (transfer != null) {
222                     try {
223                         transfer.close();
224                     } catch (IOException ex) {
225                         LOG.debug("Error closing temp file {}", temp, ex);
226                     }
227                 }
228
229                 deleteFile(temp);
230                 throw e;
231             }
232         }
233     }
234
235     private static void deleteFile(final File file) {
236         if (!file.delete()) {
237             LOG.warn("Could not delete temp file {}", file);
238         }
239     }
240
241     /**
242      * ByteArrayOutputStream that exposes its internals for efficiency.
243      */
244     private static class MemoryOutputStream extends ByteArrayOutputStream {
245         byte[] getBuffer() {
246             return buf;
247         }
248
249         int getCount() {
250             return count;
251         }
252     }
253
254     /**
255      * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
256      */
257     private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
258         private final File file;
259
260         Cleanup(final FileBackedOutputStream referent, final File file) {
261             super(referent, REFERENCE_QUEUE);
262             this.file = file;
263
264             REFERENCE_CACHE.add(this);
265
266             LOG.debug("Added Cleanup for temp file {}", file);
267         }
268
269         @Override
270         public void finalizeReferent() {
271             LOG.debug("In finalizeReferent");
272
273             if (REFERENCE_CACHE.remove(this)) {
274                 LOG.debug("finalizeReferent - deleting temp file {}", file);
275                 deleteFile(file);
276             }
277         }
278     }
279 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.