2 * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.io;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.FinalizablePhantomReference;
12 import com.google.common.base.FinalizableReferenceQueue;
13 import com.google.common.collect.Sets;
14 import com.google.common.io.ByteSource;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.file.Files;
23 import java.util.Iterator;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
34 * reaches a configurable size. This class is thread-safe.
36 * @author Thomas Pantelis
38 public class FileBackedOutputStream extends OutputStream {
39 private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
42 * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
43 * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
44 * itself from this map and thus becomes eligible for garbage collection.
47 static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
50 * Used as the ReferenceQueue for the Cleanup PhantomReferences.
52 private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
54 private final int fileThreshold;
55 private final String fileDirectory;
58 private MemoryOutputStream memory = new MemoryOutputStream();
61 private OutputStream out = memory;
67 private ByteSource source;
69 private volatile long count;
72 * Creates a new instance that uses the given file threshold, and does not reset the data when the
73 * {@link ByteSource} returned by {@link #asByteSource} is finalized.
75 * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
76 * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
79 public FileBackedOutputStream(final int fileThreshold, @Nullable final String fileDirectory) {
80 this.fileThreshold = fileThreshold;
81 this.fileDirectory = fileDirectory;
85 * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
86 * closed and further attempts to write to it will result in an IOException.
88 * @return a ByteSource instance
89 * @throws IOException if close fails
91 public synchronized @NonNull ByteSource asByteSource() throws IOException {
95 source = new ByteSource() {
97 public InputStream openStream() throws IOException {
98 synchronized (FileBackedOutputStream.this) {
100 return Files.newInputStream(file.toPath());
102 return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
118 @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
119 + "increment of count needs to be atomic even though it is inside a synchronized block.")
120 public synchronized void write(final int value) throws IOException {
121 possiblySwitchToFile(1);
127 public synchronized void write(final byte[] bytes) throws IOException {
128 write(bytes, 0, bytes.length);
132 public synchronized void write(final byte[] bytes, final int off, final int len) throws IOException {
133 possiblySwitchToFile(len);
134 out.write(bytes, off, len);
139 public synchronized void close() throws IOException {
141 OutputStream closeMe = out;
148 public synchronized void flush() throws IOException {
154 public synchronized long getCount() {
159 * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
161 public synchronized void cleanup() {
162 LOG.debug("In cleanup");
167 Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
168 while (iter.hasNext()) {
169 if (file.equals(iter.next().file)) {
175 LOG.debug("cleanup - deleting temp file {}", file);
183 private void closeQuietly() {
186 } catch (IOException e) {
187 LOG.warn("Error closing output stream {}", out, e);
192 * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
195 private void possiblySwitchToFile(final int len) throws IOException {
197 throw new IOException("Stream already closed");
200 if (file == null && memory.getCount() + len > fileThreshold) {
201 File temp = File.createTempFile("FileBackedOutputStream", null,
202 fileDirectory == null ? null : new File(fileDirectory));
205 LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
206 fileThreshold, temp);
208 OutputStream transfer = null;
210 transfer = Files.newOutputStream(temp.toPath());
211 transfer.write(memory.getBuffer(), 0, memory.getCount());
214 // We've successfully transferred the data; switch to writing to file
219 new Cleanup(this, file);
220 } catch (IOException e) {
221 if (transfer != null) {
224 } catch (IOException ex) {
225 LOG.debug("Error closing temp file {}", temp, ex);
235 private static void deleteFile(final File file) {
236 if (!file.delete()) {
237 LOG.warn("Could not delete temp file {}", file);
242 * ByteArrayOutputStream that exposes its internals for efficiency.
244 private static class MemoryOutputStream extends ByteArrayOutputStream {
255 * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
257 private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
258 private final File file;
260 Cleanup(final FileBackedOutputStream referent, final File file) {
261 super(referent, REFERENCE_QUEUE);
264 REFERENCE_CACHE.add(this);
266 LOG.debug("Added Cleanup for temp file {}", file);
270 public void finalizeReferent() {
271 LOG.debug("In finalizeReferent");
273 if (REFERENCE_CACHE.remove(this)) {
274 LOG.debug("finalizeReferent - deleting temp file {}", file);