* @author Kamal Rameshan
*/
public interface ConfigParams {
+ int MEGABYTE = 1048576;
+
/**
* Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
*
*/
String getCustomRaftPolicyImplementationClass();
+ /**
+ * Returns the directory in which to create temp files.
+ *
+ * @return the directory in which to create temp files.
+ */
+ @Nonnull
+ String getTempFileDirectory();
+
+ /**
+ * Returns the threshold in terms of number of bytes when streaming data before it should switch from storing in
+ * memory to buffering to a file.
+ *
+ * @return the threshold in terms of number of bytes.
+ */
+ int getFileBackedStreamingThreshold();
}
private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
+ private String tempFileDirectory = "";
+
+ private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
electionTimeOutInterval = null;
electionTimeOutInterval = null;
}
+ public void setTempFileDirectory(String tempFileDirectory) {
+ this.tempFileDirectory = tempFileDirectory;
+ }
+
+ public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+ }
+
public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
}
return policySupplier.get();
}
+ @Override
+ public String getTempFileDirectory() {
+ return tempFileDirectory;
+ }
+
+ @Override
+ public int getFileBackedStreamingThreshold() {
+ return fileBackedStreamingThreshold;
+ }
+
private class PolicySupplier implements Supplier<RaftPolicy> {
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
* @return the Consumer
*/
Consumer<ApplyState> getApplyStateConsumer();
+
+ /**
+ * Creates a FileBackedOutputStream with a common configuration.
+ *
+ * @return a FileBackedOutputStream instance
+ */
+ @Nonnull
+ FileBackedOutputStream newFileBackedOutputStream();
}
import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
return applyStateConsumer;
}
+ @Override
+ public FileBackedOutputStream newFileBackedOutputStream() {
+ return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
+ configParams.getTempFileDirectory());
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
void close() {
if (currentBehavior != null) {
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
OutputStream installSnapshotStream = null;
if (targetFollower != null) {
- installSnapshotStream = new ByteArrayOutputStream();
+ installSnapshotStream = context.newFileBackedOutputStream();
log.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
} else {
context.getReplicatedLog().getSnapshotTerm());
if (installSnapshotStream.isPresent()) {
- try {
- installSnapshotStream.get().close();
- } catch (IOException e) {
- log.warn("Error closing install snapshot OutputStream", e);
- }
-
if (context.getId().equals(currentBehavior.getLeaderId())) {
- ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
- .toByteArray());
-
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+ try {
+ ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+ currentBehavior.handleMessage(context.getActor(),
+ new SendInstallSnapshot(snapshot, snapshotBytes));
+ } catch (IOException e) {
+ log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+ context.getId(), e);
+ }
+ } else {
+ ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
}
}
if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
+ "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
- snapshotTracker = null;
+ closeSnapshotTracker();
}
if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
leaderId = installSnapshot.getLeaderId();
if (snapshotTracker == null) {
- snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
+ snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId(),
+ context);
}
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
-import com.google.common.io.CountingOutputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.slf4j.Logger;
/**
private final Logger log;
private final int totalChunks;
private final String leaderId;
- private final CountingOutputStream countingStream;
- private final ByteArrayOutputStream backingStream;
+ private final BufferedOutputStream bufferedStream;
+ private final FileBackedOutputStream fileBackedStream;
private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
private boolean sealed = false;
private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
+ private long count;
- SnapshotTracker(Logger log, int totalChunks, String leaderId) {
+ SnapshotTracker(Logger log, int totalChunks, String leaderId, RaftActorContext context) {
this.log = log;
this.totalChunks = totalChunks;
this.leaderId = Preconditions.checkNotNull(leaderId);
-
- backingStream = new ByteArrayOutputStream();
- countingStream = new CountingOutputStream(backingStream);
+ fileBackedStream = context.newFileBackedOutputStream();
+ bufferedStream = new BufferedOutputStream(fileBackedStream);
}
/**
boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> maybeLastChunkHashCode)
throws InvalidChunkException, IOException {
log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
- chunkIndex, lastChunkIndex, countingStream.getCount(), this.lastChunkHashCode);
+ chunkIndex, lastChunkIndex, count, this.lastChunkHashCode);
if (sealed) {
throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
+ maybeLastChunkHashCode.get());
}
- countingStream.write(chunk);
+ bufferedStream.write(chunk);
+ count += chunk.length;
sealed = chunkIndex == totalChunks;
lastChunkIndex = chunkIndex;
this.lastChunkHashCode = Arrays.hashCode(chunk);
return sealed;
}
- ByteSource getSnapshotBytes() {
+ ByteSource getSnapshotBytes() throws IOException {
if (!sealed) {
throw new IllegalStateException("lastChunk not received yet");
}
- return ByteSource.wrap(backingStream.toByteArray());
+ bufferedStream.close();
+ return fileBackedStream.asByteSource();
}
String getLeaderId() {
@Override
public void close() {
- try {
- countingStream.close();
- } catch (IOException e) {
- log.warn("Error closing snapshot stream");
- }
+ fileBackedStream.cleanup();
}
public static class InvalidChunkException extends IOException {
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
doReturn(5L).when(mockElectionTerm).getCurrentTerm();
doReturn("member5").when(mockElectionTerm).getVotedFor();
+ doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
+
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnapshotTrackerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotTrackerTest.class);
- Logger logger = LoggerFactory.getLogger(getClass());
-
- Map<String, String> data;
- ByteString byteString;
- byte[] chunk1;
- byte[] chunk2;
- byte[] chunk3;
+ @Mock
+ private RaftActorContext mockContext;
+ private FileBackedOutputStream fbos;
+ private Map<String, String> data;
+ private ByteString byteString;
+ private byte[] chunk1;
+ private byte[] chunk2;
+ private byte[] chunk3;
@Before
public void setup() {
+ MockitoAnnotations.initMocks(this);
+
data = new HashMap<>();
data.put("key1", "value1");
data.put("key2", "value2");
chunk1 = getNextChunk(byteString, 0, 10);
chunk2 = getNextChunk(byteString, 10, 10);
chunk3 = getNextChunk(byteString, 20, byteString.size());
+
+ fbos = spy(new FileBackedOutputStream(100000000, "target"));
+ doReturn(fbos).when(mockContext).newFileBackedOutputStream();
}
@Test
public void testAddChunks() throws IOException {
- SnapshotTracker tracker = new SnapshotTracker(logger, 3, "leader");
-
- tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
- tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
- tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 3, "leader", mockContext)) {
+ tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+ tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
- ByteSource snapshotBytes = tracker.getSnapshotBytes();
- assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read()));
+ ByteSource snapshotBytes = tracker.getSnapshotBytes();
+ assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read()));
+ }
- tracker.close();
+ verify(fbos).cleanup();
}
@Test(expected = SnapshotTracker.InvalidChunkException.class)
public void testAddChunkWhenAlreadySealed() throws IOException {
- try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
tracker.addChunk(1, chunk1, Optional.<Integer>absent());
tracker.addChunk(2, chunk2, Optional.<Integer>absent());
tracker.addChunk(3, chunk3, Optional.<Integer>absent());
@Test(expected = SnapshotTracker.InvalidChunkException.class)
public void testInvalidFirstChunkIndex() throws IOException {
- try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
tracker.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
}
}
@Test(expected = SnapshotTracker.InvalidChunkException.class)
public void testOutOfSequenceChunk() throws IOException {
- try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
tracker.addChunk(1, chunk1, Optional.<Integer>absent());
tracker.addChunk(3, chunk3, Optional.<Integer>absent());
}
@Test(expected = SnapshotTracker.InvalidChunkException.class)
public void testInvalidLastChunkHashCode() throws IOException {
- try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
tracker.addChunk(2, chunk2, Optional.of(1));
}
@Test(expected = IllegalStateException.class)
public void testGetSnapshotBytesWhenNotSealed() throws IOException {
- try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+ try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
tracker.addChunk(1, chunk1, Optional.<Integer>absent());
tracker.getSnapshotBytes();
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
+ * reaches a configurable size. This class is thread-safe.
+ *
+ * @author Thomas Pantelis
+ */
+@ThreadSafe
+public class FileBackedOutputStream extends OutputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
+
+ /**
+ * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
+ * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
+ * itself from this map and thus becomes eligible for garbage collection.
+ */
+ @VisibleForTesting
+ static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
+
+ /**
+ * Used as the ReferenceQueue for the Cleanup PhantomReferences.
+ */
+ private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
+
+ private final int fileThreshold;
+ private final String fileDirectory;
+
+ @GuardedBy("this")
+ private MemoryOutputStream memory = new MemoryOutputStream();
+
+ @GuardedBy("this")
+ private OutputStream out = memory;
+
+ @GuardedBy("this")
+ private File file;
+
+ @GuardedBy("this")
+ private ByteSource source;
+
+ private volatile long count;
+
+ /**
+ * Creates a new instance that uses the given file threshold, and does not reset the data when the
+ * {@link ByteSource} returned by {@link #asByteSource} is finalized.
+ *
+ * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
+ * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
+ * location is used.
+ */
+ public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
+ this.fileThreshold = fileThreshold;
+ this.fileDirectory = fileDirectory;
+ }
+
+ /**
+ * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
+ * closed and further attempts to write to it will result in an IOException.
+ *
+ * @return a ByteSource instance
+ * @throws IOException if close fails
+ */
+ @Nonnull
+ public synchronized ByteSource asByteSource() throws IOException {
+ close();
+
+ if (source == null) {
+ source = new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ synchronized (FileBackedOutputStream.this) {
+ if (file != null) {
+ return new FileInputStream(file);
+ } else {
+ return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
+ }
+ }
+ }
+
+ @Override
+ public long size() {
+ return count;
+ }
+ };
+ }
+
+ return source;
+ }
+
+ @Override
+ @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
+ + "increment of count needs to be atomic even though it is inside a synchronized block.")
+ public synchronized void write(int value) throws IOException {
+ possiblySwitchToFile(1);
+ out.write(value);
+ count++;
+ }
+
+ @Override
+ public synchronized void write(byte[] bytes) throws IOException {
+ write(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public synchronized void write(byte[] bytes, int off, int len) throws IOException {
+ possiblySwitchToFile(len);
+ out.write(bytes, off, len);
+ count += len;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (out != null) {
+ OutputStream closeMe = out;
+ out = null;
+ closeMe.close();
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ public synchronized long getCount() {
+ return count;
+ }
+
+ /**
+ * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
+ */
+ public synchronized void cleanup() {
+ LOG.debug("In cleanup");
+
+ closeQuietly();
+
+ if (file != null) {
+ Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
+ while (iter.hasNext()) {
+ if (file.equals(iter.next().file)) {
+ iter.remove();
+ break;
+ }
+ }
+
+ LOG.debug("cleanup - deleting temp file {}", file);
+
+ deleteFile(file);
+ file = null;
+ }
+ }
+
+ @GuardedBy("this")
+ private void closeQuietly() {
+ try {
+ close();
+ } catch (IOException e) {
+ LOG.warn("Error closing output stream {}", out, e);
+ }
+ }
+
+ /**
+ * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
+ */
+ @GuardedBy("this")
+ private void possiblySwitchToFile(int len) throws IOException {
+ if (out == null) {
+ throw new IOException("Stream already closed");
+ }
+
+ if (file == null && memory.getCount() + len > fileThreshold) {
+ File temp = File.createTempFile("FileBackedOutputStream", null, new File(fileDirectory));
+ temp.deleteOnExit();
+
+ LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
+ fileThreshold, temp);
+
+ FileOutputStream transfer = null;
+ try {
+ transfer = new FileOutputStream(temp);
+ transfer.write(memory.getBuffer(), 0, memory.getCount());
+ transfer.flush();
+
+ // We've successfully transferred the data; switch to writing to file
+ out = transfer;
+ file = temp;
+ memory = null;
+
+ new Cleanup(this, file);
+ } catch (IOException e) {
+ if (transfer != null) {
+ try {
+ transfer.close();
+ } catch (IOException ex) {
+ LOG.debug("Error closing temp file {}", temp, ex);
+ }
+ }
+
+ deleteFile(temp);
+ throw e;
+ }
+ }
+ }
+
+ private static void deleteFile(File file) {
+ if (!file.delete()) {
+ LOG.warn("Could not delete temp file {}", file);
+ }
+ }
+
+ /**
+ * ByteArrayOutputStream that exposes its internals for efficiency.
+ */
+ private static class MemoryOutputStream extends ByteArrayOutputStream {
+ byte[] getBuffer() {
+ return buf;
+ }
+
+ int getCount() {
+ return count;
+ }
+ }
+
+ /**
+ * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
+ */
+ private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
+ private final File file;
+
+ Cleanup(FileBackedOutputStream referent, File file) {
+ super(referent, REFERENCE_QUEUE);
+ this.file = file;
+
+ REFERENCE_CACHE.add(this);
+
+ LOG.debug("Added Cleanup for temp file {}", file);
+ }
+
+ @Override
+ public void finalizeReferent() {
+ LOG.debug("In finalizeReferent");
+
+ if (REFERENCE_CACHE.remove(this)) {
+ LOG.debug("finalizeReferent - deleting temp file {}", file);
+ deleteFile(file);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for FileBackedOutputStream.
+ *
+ * @author Thomas Pantelis
+ */
+public class FileBackedOutputStreamTest {
+ private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStreamTest.class);
+ private static final String TEMP_DIR = "target/FileBackedOutputStreamTest";
+
+ @BeforeClass
+ public static void staticSetup() {
+ File dir = new File(TEMP_DIR);
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new RuntimeException("Failed to create temp dir " + TEMP_DIR);
+ }
+ }
+
+ @AfterClass
+ public static void staticCleanup() {
+ deleteTempFiles();
+ deleteFile(TEMP_DIR);
+ }
+
+ @Before
+ public void setup() {
+ deleteTempFiles();
+ FileBackedOutputStream.REFERENCE_CACHE.clear();
+ }
+
+ @After
+ public void cleanup() {
+ deleteTempFiles();
+ }
+
+ @Test
+ public void testFileThresholdNotReached() throws IOException {
+ LOG.info("testFileThresholdNotReached starting");
+ try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) {
+ byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9};
+ fbos.write(bytes[0]);
+ fbos.write(bytes, 1, bytes.length - 1);
+
+ assertEquals("getCount", bytes.length, fbos.getCount());
+ assertNull("Found unexpected temp file", findTempFileName());
+ assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+ // Read bytes twice.
+ assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+ assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+
+ assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+ fbos.cleanup();
+ }
+
+ LOG.info("testFileThresholdNotReached ending");
+ }
+
+ @Test
+ public void testFileThresholdReachedWithWriteBytes() throws IOException {
+ LOG.info("testFileThresholdReachedWithWriteBytes starting");
+ try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) {
+ byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14};
+ fbos.write(bytes[0]);
+ fbos.write(bytes, 1, 11);
+
+ String tempFileName = findTempFileName();
+ assertNotNull("Expected temp file created", tempFileName);
+
+ fbos.write(bytes[12]);
+ fbos.write(bytes, 13, bytes.length - 13);
+
+ assertEquals("Temp file", tempFileName, findTempFileName());
+ assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+ InputStream inputStream = fbos.asByteSource().openStream();
+
+ assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+
+ byte[] inBytes = new byte[bytes.length];
+ assertEquals("# bytes read", bytes.length, inputStream.read(inBytes));
+ assertArrayEquals("Read InputStream", bytes, inBytes);
+ assertEquals("End of stream", -1, inputStream.read());
+
+ inputStream.close();
+
+ assertEquals("Reference cache size", 1, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+ fbos.cleanup();
+
+ assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+ assertNull("Found unexpected temp file", findTempFileName());
+ }
+
+ LOG.info("testFileThresholdReachedWithWriteBytes ending");
+ }
+
+ @Test
+ public void testFileThresholdReachedWithWriteByte() throws IOException {
+ LOG.info("testFileThresholdReachedWithWriteByte starting");
+ try (FileBackedOutputStream fbos = new FileBackedOutputStream(2, TEMP_DIR)) {
+ byte[] bytes = new byte[]{0, 1, 2};
+ fbos.write(bytes[0]);
+ fbos.write(bytes[1]);
+
+ assertNull("Found unexpected temp file", findTempFileName());
+
+ fbos.write(bytes[2]);
+ fbos.flush();
+
+ assertNotNull("Expected temp file created", findTempFileName());
+
+ assertEquals("Size", bytes.length, fbos.asByteSource().size());
+ assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+ }
+
+ LOG.info("testFileThresholdReachedWithWriteByte ending");
+ }
+
+ @Test(expected = IOException.class)
+ public void testWriteAfterAsByteSource() throws IOException {
+ LOG.info("testWriteAfterAsByteSource starting");
+ try (FileBackedOutputStream fbos = new FileBackedOutputStream(3, TEMP_DIR)) {
+ byte[] bytes = new byte[]{0, 1, 2};
+ fbos.write(bytes);
+
+ assertNull("Found unexpected temp file", findTempFileName());
+ assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+ // Should throw IOException after call to asByteSource.
+ fbos.write(1);
+ }
+ }
+
+ @Test
+ public void testTempFileDeletedOnGC() throws IOException {
+ LOG.info("testTempFileDeletedOnGC starting");
+
+ FileBackedOutputStream fbos = null;
+ try {
+ fbos = new FileBackedOutputStream(1, TEMP_DIR);
+ fbos.write(new byte[] {0, 1});
+ assertNotNull("Expected temp file created", findTempFileName());
+ } finally {
+ if (fbos != null) {
+ fbos.close();
+ }
+ fbos = null;
+ }
+
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= 20) {
+ System.gc();
+ if (findTempFileName() == null) {
+ return;
+ }
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Temp file was not deleted");
+ }
+
+ private static String findTempFileName() {
+ String[] files = new File(TEMP_DIR).list();
+ assertNotNull(files);
+ assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2);
+ return files.length == 1 ? files[0] : null;
+ }
+
+ private static boolean deleteFile(String file) {
+ return new File(file).delete();
+ }
+
+ private static void deleteTempFiles() {
+ String[] files = new File(TEMP_DIR).list();
+ if (files != null) {
+ for (String file: files) {
+ deleteFile(TEMP_DIR + File.separator + file);
+ }
+ }
+ }
+}
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
+ setTempFileDirectory(other.getTempFileDirectory());
+ setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
}
public static Builder newBuilder() {
return shardManagerPersistenceId;
}
+ public String getTempFileDirectory() {
+ return raftConfig.getTempFileDirectory();
+ }
+
+ private void setTempFileDirectory(String tempFileDirectory) {
+ raftConfig.setTempFileDirectory(tempFileDirectory);
+ }
+
+ public int getFileBackedStreamingThreshold() {
+ return raftConfig.getFileBackedStreamingThreshold();
+ }
+
+ private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
+ }
+
private void setPeerAddressResolver(PeerAddressResolver resolver) {
raftConfig.setPeerAddressResolver(resolver);
}
datastoreContext.setPeerAddressResolver(resolver);
return this;
}
+
+ public Builder tempFileDirectory(String tempFileDirectory) {
+ datastoreContext.setTempFileDirectory(tempFileDirectory);
+ return this;
+ }
+
+ public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) {
+ datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+ return this;
+ }
}
}
return DatastoreContext.newBuilder()
.logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .tempFileDirectory("./data")
+ .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes()
+ .getValue().intValue())
.maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
.maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
.getValue().intValue())
return DatastoreContext.newBuilder()
.logicalStoreType(LogicalDatastoreType.OPERATIONAL)
+ .tempFileDirectory("./data")
+ .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes()
+ .getValue().intValue())
.maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
.maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
.getValue().intValue())
description "Use a newer protocol between the frontend and backend. This feature is considered
exprerimental at this point.";
}
+
+ leaf file-backed-streaming-threshold-in-megabytes {
+ default 128;
+ type non-zero-uint32-type;
+ description "When streaming large amounts of data, eg when sending a snapshot to a follower, this
+ is the threshold in terms of number of megabytes before it should switch from storing in memory to
+ buffering to a file.";
+ }
}
// Augments the 'configuration' choice node under modules/module.