*/
package org.opendaylight.controller.cluster.io;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
-import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.Set;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
+import java.lang.ref.Cleaner;
+import java.lang.ref.Cleaner.Cleanable;
+import java.nio.file.Files;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Thomas Pantelis
*/
-@ThreadSafe
public class FileBackedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
/**
- * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
- * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
- * itself from this map and thus becomes eligible for garbage collection.
+ * A Cleaner instance responsible for deleting any files which may be lost due to us not being cleaning up
+ * temporary files.
*/
- @VisibleForTesting
- static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
-
- /**
- * Used as the ReferenceQueue for the Cleanup PhantomReferences.
- */
- private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
+ private static final Cleaner FILE_CLEANER = Cleaner.create();
private final int fileThreshold;
private final String fileDirectory;
@GuardedBy("this")
private File file;
+ @GuardedBy("this")
+ private Cleanable fileCleanup;
+
@GuardedBy("this")
private ByteSource source;
* @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
* location is used.
*/
- public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
+ public FileBackedOutputStream(final int fileThreshold, @Nullable final String fileDirectory) {
this.fileThreshold = fileThreshold;
this.fileDirectory = fileDirectory;
}
* @return a ByteSource instance
* @throws IOException if close fails
*/
- @Nonnull
- public synchronized ByteSource asByteSource() throws IOException {
+ public synchronized @NonNull ByteSource asByteSource() throws IOException {
close();
if (source == null) {
public InputStream openStream() throws IOException {
synchronized (FileBackedOutputStream.this) {
if (file != null) {
- return new FileInputStream(file);
+ return Files.newInputStream(file.toPath());
} else {
return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
}
@Override
@SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
+ "increment of count needs to be atomic even though it is inside a synchronized block.")
- public synchronized void write(int value) throws IOException {
+ public synchronized void write(final int value) throws IOException {
possiblySwitchToFile(1);
out.write(value);
count++;
}
@Override
- public synchronized void write(byte[] bytes) throws IOException {
+ public synchronized void write(final byte[] bytes) throws IOException {
write(bytes, 0, bytes.length);
}
@Override
- public synchronized void write(byte[] bytes, int off, int len) throws IOException {
+ public synchronized void write(final byte[] bytes, final int off, final int len) throws IOException {
possiblySwitchToFile(len);
out.write(bytes, off, len);
count += len;
*/
public synchronized void cleanup() {
LOG.debug("In cleanup");
-
closeQuietly();
-
- if (file != null) {
- Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
- while (iter.hasNext()) {
- if (file.equals(iter.next().file)) {
- iter.remove();
- break;
- }
- }
-
- LOG.debug("cleanup - deleting temp file {}", file);
-
- deleteFile(file);
- file = null;
+ if (fileCleanup != null) {
+ fileCleanup.clean();
}
+ // Already deleted above
+ file = null;
}
- @GuardedBy("this")
+ @Holding("this")
private void closeQuietly() {
try {
close();
/**
* Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
*/
- @GuardedBy("this")
- private void possiblySwitchToFile(int len) throws IOException {
+ @Holding("this")
+ private void possiblySwitchToFile(final int len) throws IOException {
if (out == null) {
throw new IOException("Stream already closed");
}
if (file == null && memory.getCount() + len > fileThreshold) {
- File temp = File.createTempFile("FileBackedOutputStream", null, new File(fileDirectory));
+ final File temp = File.createTempFile("FileBackedOutputStream", null,
+ fileDirectory == null ? null : new File(fileDirectory));
temp.deleteOnExit();
+ final Cleaner.Cleanable cleanup = FILE_CLEANER.register(this, () -> deleteFile(temp));
LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
fileThreshold, temp);
- FileOutputStream transfer = null;
+ final OutputStream transfer;
try {
- transfer = new FileOutputStream(temp);
- transfer.write(memory.getBuffer(), 0, memory.getCount());
- transfer.flush();
-
- // We've successfully transferred the data; switch to writing to file
- out = transfer;
- file = temp;
- memory = null;
-
- new Cleanup(this, file);
- } catch (IOException e) {
- if (transfer != null) {
+ transfer = Files.newOutputStream(temp.toPath());
+ try {
+ transfer.write(memory.getBuffer(), 0, memory.getCount());
+ transfer.flush();
+ } catch (IOException e) {
try {
transfer.close();
} catch (IOException ex) {
LOG.debug("Error closing temp file {}", temp, ex);
}
+ throw e;
}
-
- deleteFile(temp);
+ } catch (IOException e) {
+ cleanup.clean();
throw e;
}
+
+ // We've successfully transferred the data; switch to writing to file
+ out = transfer;
+ file = temp;
+ fileCleanup = cleanup;
+ memory = null;
}
}
- private static void deleteFile(File file) {
+ private static void deleteFile(final File file) {
+ LOG.debug("Deleting temp file {}", file);
if (!file.delete()) {
LOG.warn("Could not delete temp file {}", file);
}
return count;
}
}
-
- /**
- * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
- */
- private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
- private final File file;
-
- Cleanup(FileBackedOutputStream referent, File file) {
- super(referent, REFERENCE_QUEUE);
- this.file = file;
-
- REFERENCE_CACHE.add(this);
-
- LOG.debug("Added Cleanup for temp file {}", file);
- }
-
- @Override
- public void finalizeReferent() {
- LOG.debug("In finalizeReferent");
-
- if (REFERENCE_CACHE.remove(this)) {
- LOG.debug("finalizeReferent - deleting temp file {}", file);
- deleteFile(file);
- }
- }
- }
}