Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / FileBackedOutputStream.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import com.google.common.io.ByteSource;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.io.ByteArrayInputStream;
13 import java.io.ByteArrayOutputStream;
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.lang.ref.Cleaner;
19 import java.lang.ref.Cleaner.Cleanable;
20 import java.nio.file.Files;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
30  * reaches a configurable size. This class is thread-safe.
31  *
32  * @author Thomas Pantelis
33  */
34 public class FileBackedOutputStream extends OutputStream {
35     private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
36
37     /**
38      * A Cleaner instance responsible for deleting any files which may be lost due to us not being cleaning up
39      * temporary files.
40      */
41     private static final Cleaner FILE_CLEANER = Cleaner.create();
42
43     private final int fileThreshold;
44     private final String fileDirectory;
45
46     @GuardedBy("this")
47     private MemoryOutputStream memory = new MemoryOutputStream();
48
49     @GuardedBy("this")
50     private OutputStream out = memory;
51
52     @GuardedBy("this")
53     private File file;
54
55     @GuardedBy("this")
56     private Cleanable fileCleanup;
57
58     @GuardedBy("this")
59     private ByteSource source;
60
61     private volatile long count;
62
63     /**
64      * Creates a new instance that uses the given file threshold, and does not reset the data when the
65      * {@link ByteSource} returned by {@link #asByteSource} is finalized.
66      *
67      * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
68      * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
69      *                      location is used.
70      */
71     public FileBackedOutputStream(final int fileThreshold, @Nullable final String fileDirectory) {
72         this.fileThreshold = fileThreshold;
73         this.fileDirectory = fileDirectory;
74     }
75
76     /**
77      * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
78      * closed and further attempts to write to it will result in an IOException.
79      *
80      * @return a ByteSource instance
81      * @throws IOException if close fails
82      */
83     public synchronized @NonNull ByteSource asByteSource() throws IOException {
84         close();
85
86         if (source == null) {
87             source = new ByteSource() {
88                 @Override
89                 public InputStream openStream() throws IOException {
90                     synchronized (FileBackedOutputStream.this) {
91                         if (file != null) {
92                             return Files.newInputStream(file.toPath());
93                         } else {
94                             return new ByteArrayInputStream(memory.buf(), 0, memory.count());
95                         }
96                     }
97                 }
98
99                 @Override
100                 public long size() {
101                     return count;
102                 }
103             };
104         }
105
106         return source;
107     }
108
109     @Override
110     @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
111         + "increment of count needs to be atomic even though it is inside a synchronized block.")
112     public synchronized void write(final int value) throws IOException {
113         possiblySwitchToFile(1);
114         out.write(value);
115         count++;
116     }
117
118     @Override
119     public synchronized void write(final byte[] bytes) throws IOException {
120         write(bytes, 0, bytes.length);
121     }
122
123     @Override
124     public synchronized void write(final byte[] bytes, final int off, final int len) throws IOException {
125         possiblySwitchToFile(len);
126         out.write(bytes, off, len);
127         count += len;
128     }
129
130     @Override
131     public synchronized void close() throws IOException {
132         if (out != null) {
133             OutputStream closeMe = out;
134             out = null;
135             closeMe.close();
136         }
137     }
138
139     @Override
140     public synchronized void flush() throws IOException {
141         if (out != null) {
142             out.flush();
143         }
144     }
145
146     public synchronized long getCount() {
147         return count;
148     }
149
150     /**
151      * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
152      */
153     public synchronized void cleanup() {
154         LOG.debug("In cleanup");
155         closeQuietly();
156         if (fileCleanup != null) {
157             fileCleanup.clean();
158         }
159         // Already deleted above
160         file = null;
161     }
162
163     @Holding("this")
164     private void closeQuietly() {
165         try {
166             close();
167         } catch (IOException e) {
168             LOG.warn("Error closing output stream {}", out, e);
169         }
170     }
171
172     /**
173      * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
174      */
175     @Holding("this")
176     private void possiblySwitchToFile(final int len) throws IOException {
177         if (out == null) {
178             throw new IOException("Stream already closed");
179         }
180
181         if (file == null && memory.count() + len > fileThreshold) {
182             final File temp = File.createTempFile("FileBackedOutputStream", null,
183                     fileDirectory == null ? null : new File(fileDirectory));
184             temp.deleteOnExit();
185             final Cleaner.Cleanable cleanup = FILE_CLEANER.register(this, () -> deleteFile(temp));
186
187             LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.count() + len,
188                     fileThreshold, temp);
189
190             final OutputStream transfer;
191             try {
192                 transfer = Files.newOutputStream(temp.toPath());
193                 try {
194                     transfer.write(memory.buf(), 0, memory.count());
195                     transfer.flush();
196                 } catch (IOException e) {
197                     try {
198                         transfer.close();
199                     } catch (IOException ex) {
200                         LOG.debug("Error closing temp file {}", temp, ex);
201                     }
202                     throw e;
203                 }
204             } catch (IOException e) {
205                 cleanup.clean();
206                 throw e;
207             }
208
209             // We've successfully transferred the data; switch to writing to file
210             out = transfer;
211             file = temp;
212             fileCleanup = cleanup;
213             memory = null;
214         }
215     }
216
217     private static void deleteFile(final File file) {
218         LOG.debug("Deleting temp file {}", file);
219         if (!file.delete()) {
220             LOG.warn("Could not delete temp file {}", file);
221         }
222     }
223
224     /**
225      * ByteArrayOutputStream that exposes its internals for efficiency.
226      */
227     private static final class MemoryOutputStream extends ByteArrayOutputStream {
228         byte[] buf() {
229             return buf;
230         }
231
232         int count() {
233             return count;
234         }
235     }
236 }