From: Tom Pantelis Date: Sun, 22 Jan 2017 05:38:05 +0000 (-0500) Subject: Bug 7521: Add FileBackedOutputStream and use for snapshot chunking X-Git-Tag: release/carbon~257 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d796a8de8b208ca24bb57aebfc689f8be8bc2c7b Bug 7521: Add FileBackedOutputStream and use for snapshot chunking Added a FileBackedOutputStream class, similar to the one in guava except: - allows for the temp dir to be confgured - keeps track of the count of bytes written for efficiency - uses a PhantomReference to delete the temp file instead of using finalize. FileBackedOutputStream is now used in chunking the snapshop on the leader side and re-assembling on the follower side. Change-Id: Ieea4bc1388ffe18d6803783d2bb714089716b7b1 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index 0e701e3aa5..86ce3113fa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -24,6 +24,8 @@ import scala.concurrent.duration.FiniteDuration; * @author Kamal Rameshan */ public interface ConfigParams { + int MEGABYTE = 1048576; + /** * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken. * @@ -114,4 +116,19 @@ public interface ConfigParams { */ String getCustomRaftPolicyImplementationClass(); + /** + * Returns the directory in which to create temp files. + * + * @return the directory in which to create temp files. + */ + @Nonnull + String getTempFileDirectory(); + + /** + * Returns the threshold in terms of number of bytes when streaming data before it should switch from storing in + * memory to buffering to a file. + * + * @return the threshold in terms of number of bytes. + */ + int getFileBackedStreamingThreshold(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 317ce6a220..56fb633672 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -68,6 +68,10 @@ public class DefaultConfigParamsImpl implements ConfigParams { private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE; + private String tempFileDirectory = ""; + + private int fileBackedStreamingThreshold = 128 * MEGABYTE; + public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; electionTimeOutInterval = null; @@ -98,6 +102,14 @@ public class DefaultConfigParamsImpl implements ConfigParams { electionTimeOutInterval = null; } + public void setTempFileDirectory(String tempFileDirectory) { + this.tempFileDirectory = tempFileDirectory; + } + + public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + this.fileBackedStreamingThreshold = fileBackedStreamingThreshold; + } + public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) { this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass; } @@ -162,6 +174,16 @@ public class DefaultConfigParamsImpl implements ConfigParams { return policySupplier.get(); } + @Override + public String getTempFileDirectory() { + return tempFileDirectory; + } + + @Override + public int getFileBackedStreamingThreshold() { + return fileBackedStreamingThreshold; + } + private class PolicySupplier implements Supplier { @Override @SuppressWarnings("checkstyle:IllegalCatch") diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 74a214f90a..1d59ed1f85 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -21,6 +21,7 @@ import java.util.function.LongSupplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -326,4 +327,12 @@ public interface RaftActorContext { * @return the Consumer */ Consumer getApplyStateConsumer(); + + /** + * Creates a FileBackedOutputStream with a common configuration. + * + * @return a FileBackedOutputStream instance + */ + @Nonnull + FileBackedOutputStream newFileBackedOutputStream(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 43a58b9709..b307195a7a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import java.util.function.LongSupplier; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -399,6 +400,12 @@ public class RaftActorContextImpl implements RaftActorContext { return applyStateConsumer; } + @Override + public FileBackedOutputStream newFileBackedOutputStream() { + return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(), + configParams.getTempFileDirectory()); + } + @SuppressWarnings("checkstyle:IllegalCatch") void close() { if (currentBehavior != null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 12508aebff..71ef8ffcea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -11,13 +11,13 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteSource; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Optional; import java.util.function.Consumer; import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState { OutputStream installSnapshotStream = null; if (targetFollower != null) { - installSnapshotStream = new ByteArrayOutputStream(); + installSnapshotStream = context.newFileBackedOutputStream(); log.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); } else { @@ -410,18 +410,17 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().getSnapshotTerm()); if (installSnapshotStream.isPresent()) { - try { - installSnapshotStream.get().close(); - } catch (IOException e) { - log.warn("Error closing install snapshot OutputStream", e); - } - if (context.getId().equals(currentBehavior.getLeaderId())) { - ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get()) - .toByteArray()); - - // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes)); + try { + ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource(); + currentBehavior.handleMessage(context.getActor(), + new SendInstallSnapshot(snapshot, snapshotBytes)); + } catch (IOException e) { + log.error("{}: Snapshot install failed due to an unrecoverable streaming error", + context.getId(), e); + } + } else { + ((FileBackedOutputStream)installSnapshotStream.get()).cleanup(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 6e5495db0e..30c1264ee8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -139,7 +139,7 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) { log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId()); - snapshotTracker = null; + closeSnapshotTracker(); } if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { @@ -518,7 +518,8 @@ public class Follower extends AbstractRaftActorBehavior { leaderId = installSnapshot.getLeaderId(); if (snapshotTracker == null) { - snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId()); + snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId(), + context); } updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index 77c9cb5783..6b7e037a33 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -11,10 +11,11 @@ package org.opendaylight.controller.cluster.raft.behaviors; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.io.ByteSource; -import com.google.common.io.CountingOutputStream; -import java.io.ByteArrayOutputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.util.Arrays; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.slf4j.Logger; /** @@ -24,19 +25,19 @@ class SnapshotTracker implements AutoCloseable { private final Logger log; private final int totalChunks; private final String leaderId; - private final CountingOutputStream countingStream; - private final ByteArrayOutputStream backingStream; + private final BufferedOutputStream bufferedStream; + private final FileBackedOutputStream fileBackedStream; private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1; private boolean sealed = false; private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE; + private long count; - SnapshotTracker(Logger log, int totalChunks, String leaderId) { + SnapshotTracker(Logger log, int totalChunks, String leaderId, RaftActorContext context) { this.log = log; this.totalChunks = totalChunks; this.leaderId = Preconditions.checkNotNull(leaderId); - - backingStream = new ByteArrayOutputStream(); - countingStream = new CountingOutputStream(backingStream); + fileBackedStream = context.newFileBackedOutputStream(); + bufferedStream = new BufferedOutputStream(fileBackedStream); } /** @@ -51,7 +52,7 @@ class SnapshotTracker implements AutoCloseable { boolean addChunk(int chunkIndex, byte[] chunk, Optional maybeLastChunkHashCode) throws InvalidChunkException, IOException { log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}", - chunkIndex, lastChunkIndex, countingStream.getCount(), this.lastChunkHashCode); + chunkIndex, lastChunkIndex, count, this.lastChunkHashCode); if (sealed) { throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex @@ -68,20 +69,22 @@ class SnapshotTracker implements AutoCloseable { + maybeLastChunkHashCode.get()); } - countingStream.write(chunk); + bufferedStream.write(chunk); + count += chunk.length; sealed = chunkIndex == totalChunks; lastChunkIndex = chunkIndex; this.lastChunkHashCode = Arrays.hashCode(chunk); return sealed; } - ByteSource getSnapshotBytes() { + ByteSource getSnapshotBytes() throws IOException { if (!sealed) { throw new IllegalStateException("lastChunk not received yet"); } - return ByteSource.wrap(backingStream.toByteArray()); + bufferedStream.close(); + return fileBackedStream.asByteSource(); } String getLeaderId() { @@ -90,11 +93,7 @@ class SnapshotTracker implements AutoCloseable { @Override public void close() { - try { - countingStream.close(); - } catch (IOException e) { - log.warn("Error closing snapshot stream"); - } + fileBackedStream.cleanup(); } public static class InvalidChunkException extends IOException { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 7591c2f7ec..3f96485351 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -38,6 +38,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -96,6 +97,8 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(5L).when(mockElectionTerm).getCurrentTerm(); doReturn("member5").when(mockElectionTerm).getVotedFor(); + doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream(); + snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); factory = new TestActorFactory(getSystem()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index 281f8071d7..f52d8e4e9d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -9,6 +9,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import com.google.common.base.Optional; import com.google.common.io.ByteSource; @@ -21,21 +24,29 @@ import java.util.Map; import org.apache.commons.lang3.SerializationUtils; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SnapshotTrackerTest { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotTrackerTest.class); - Logger logger = LoggerFactory.getLogger(getClass()); - - Map data; - ByteString byteString; - byte[] chunk1; - byte[] chunk2; - byte[] chunk3; + @Mock + private RaftActorContext mockContext; + private FileBackedOutputStream fbos; + private Map data; + private ByteString byteString; + private byte[] chunk1; + private byte[] chunk2; + private byte[] chunk3; @Before public void setup() { + MockitoAnnotations.initMocks(this); + data = new HashMap<>(); data.put("key1", "value1"); data.put("key2", "value2"); @@ -45,25 +56,28 @@ public class SnapshotTrackerTest { chunk1 = getNextChunk(byteString, 0, 10); chunk2 = getNextChunk(byteString, 10, 10); chunk3 = getNextChunk(byteString, 20, byteString.size()); + + fbos = spy(new FileBackedOutputStream(100000000, "target")); + doReturn(fbos).when(mockContext).newFileBackedOutputStream(); } @Test public void testAddChunks() throws IOException { - SnapshotTracker tracker = new SnapshotTracker(logger, 3, "leader"); - - tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); - tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); - tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2))); + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 3, "leader", mockContext)) { + tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); + tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); + tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2))); - ByteSource snapshotBytes = tracker.getSnapshotBytes(); - assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read())); + ByteSource snapshotBytes = tracker.getSnapshotBytes(); + assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read())); + } - tracker.close(); + verify(fbos).cleanup(); } @Test(expected = SnapshotTracker.InvalidChunkException.class) public void testAddChunkWhenAlreadySealed() throws IOException { - try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) { tracker.addChunk(1, chunk1, Optional.absent()); tracker.addChunk(2, chunk2, Optional.absent()); tracker.addChunk(3, chunk3, Optional.absent()); @@ -72,14 +86,14 @@ public class SnapshotTrackerTest { @Test(expected = SnapshotTracker.InvalidChunkException.class) public void testInvalidFirstChunkIndex() throws IOException { - try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) { tracker.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); } } @Test(expected = SnapshotTracker.InvalidChunkException.class) public void testOutOfSequenceChunk() throws IOException { - try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) { tracker.addChunk(1, chunk1, Optional.absent()); tracker.addChunk(3, chunk3, Optional.absent()); } @@ -87,7 +101,7 @@ public class SnapshotTrackerTest { @Test(expected = SnapshotTracker.InvalidChunkException.class) public void testInvalidLastChunkHashCode() throws IOException { - try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) { tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); tracker.addChunk(2, chunk2, Optional.of(1)); } @@ -95,7 +109,7 @@ public class SnapshotTrackerTest { @Test(expected = IllegalStateException.class) public void testGetSnapshotBytesWhenNotSealed() throws IOException { - try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) { tracker.addChunk(1, chunk1, Optional.absent()); tracker.getSnapshotBytes(); } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java new file mode 100644 index 0000000000..452611b500 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.FinalizablePhantomReference; +import com.google.common.base.FinalizableReferenceQueue; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSource; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data + * reaches a configurable size. This class is thread-safe. + * + * @author Thomas Pantelis + */ +@ThreadSafe +public class FileBackedOutputStream extends OutputStream { + private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class); + + /** + * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences + * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes + * itself from this map and thus becomes eligible for garbage collection. + */ + @VisibleForTesting + static final Set REFERENCE_CACHE = Sets.newConcurrentHashSet(); + + /** + * Used as the ReferenceQueue for the Cleanup PhantomReferences. + */ + private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue(); + + private final int fileThreshold; + private final String fileDirectory; + + @GuardedBy("this") + private MemoryOutputStream memory = new MemoryOutputStream(); + + @GuardedBy("this") + private OutputStream out = memory; + + @GuardedBy("this") + private File file; + + @GuardedBy("this") + private ByteSource source; + + private volatile long count; + + /** + * Creates a new instance that uses the given file threshold, and does not reset the data when the + * {@link ByteSource} returned by {@link #asByteSource} is finalized. + * + * @param fileThreshold the number of bytes before the stream should switch to buffering to a file + * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file + * location is used. + */ + public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) { + this.fileThreshold = fileThreshold; + this.fileDirectory = fileDirectory; + } + + /** + * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is + * closed and further attempts to write to it will result in an IOException. + * + * @return a ByteSource instance + * @throws IOException if close fails + */ + @Nonnull + public synchronized ByteSource asByteSource() throws IOException { + close(); + + if (source == null) { + source = new ByteSource() { + @Override + public InputStream openStream() throws IOException { + synchronized (FileBackedOutputStream.this) { + if (file != null) { + return new FileInputStream(file); + } else { + return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount()); + } + } + } + + @Override + public long size() { + return count; + } + }; + } + + return source; + } + + @Override + @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the " + + "increment of count needs to be atomic even though it is inside a synchronized block.") + public synchronized void write(int value) throws IOException { + possiblySwitchToFile(1); + out.write(value); + count++; + } + + @Override + public synchronized void write(byte[] bytes) throws IOException { + write(bytes, 0, bytes.length); + } + + @Override + public synchronized void write(byte[] bytes, int off, int len) throws IOException { + possiblySwitchToFile(len); + out.write(bytes, off, len); + count += len; + } + + @Override + public synchronized void close() throws IOException { + if (out != null) { + OutputStream closeMe = out; + out = null; + closeMe.close(); + } + } + + @Override + public synchronized void flush() throws IOException { + if (out != null) { + out.flush(); + } + } + + public synchronized long getCount() { + return count; + } + + /** + * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file. + */ + public synchronized void cleanup() { + LOG.debug("In cleanup"); + + closeQuietly(); + + if (file != null) { + Iterator iter = REFERENCE_CACHE.iterator(); + while (iter.hasNext()) { + if (file.equals(iter.next().file)) { + iter.remove(); + break; + } + } + + LOG.debug("cleanup - deleting temp file {}", file); + + deleteFile(file); + file = null; + } + } + + @GuardedBy("this") + private void closeQuietly() { + try { + close(); + } catch (IOException e) { + LOG.warn("Error closing output stream {}", out, e); + } + } + + /** + * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so. + */ + @GuardedBy("this") + private void possiblySwitchToFile(int len) throws IOException { + if (out == null) { + throw new IOException("Stream already closed"); + } + + if (file == null && memory.getCount() + len > fileThreshold) { + File temp = File.createTempFile("FileBackedOutputStream", null, new File(fileDirectory)); + temp.deleteOnExit(); + + LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len, + fileThreshold, temp); + + FileOutputStream transfer = null; + try { + transfer = new FileOutputStream(temp); + transfer.write(memory.getBuffer(), 0, memory.getCount()); + transfer.flush(); + + // We've successfully transferred the data; switch to writing to file + out = transfer; + file = temp; + memory = null; + + new Cleanup(this, file); + } catch (IOException e) { + if (transfer != null) { + try { + transfer.close(); + } catch (IOException ex) { + LOG.debug("Error closing temp file {}", temp, ex); + } + } + + deleteFile(temp); + throw e; + } + } + } + + private static void deleteFile(File file) { + if (!file.delete()) { + LOG.warn("Could not delete temp file {}", file); + } + } + + /** + * ByteArrayOutputStream that exposes its internals for efficiency. + */ + private static class MemoryOutputStream extends ByteArrayOutputStream { + byte[] getBuffer() { + return buf; + } + + int getCount() { + return count; + } + } + + /** + * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected. + */ + private static class Cleanup extends FinalizablePhantomReference { + private final File file; + + Cleanup(FileBackedOutputStream referent, File file) { + super(referent, REFERENCE_QUEUE); + this.file = file; + + REFERENCE_CACHE.add(this); + + LOG.debug("Added Cleanup for temp file {}", file); + } + + @Override + public void finalizeReferent() { + LOG.debug("In finalizeReferent"); + + if (REFERENCE_CACHE.remove(this)) { + LOG.debug("finalizeReferent - deleting temp file {}", file); + deleteFile(file); + } + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java new file mode 100644 index 0000000000..bcb1850d8c --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for FileBackedOutputStream. + * + * @author Thomas Pantelis + */ +public class FileBackedOutputStreamTest { + private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStreamTest.class); + private static final String TEMP_DIR = "target/FileBackedOutputStreamTest"; + + @BeforeClass + public static void staticSetup() { + File dir = new File(TEMP_DIR); + if (!dir.exists() && !dir.mkdirs()) { + throw new RuntimeException("Failed to create temp dir " + TEMP_DIR); + } + } + + @AfterClass + public static void staticCleanup() { + deleteTempFiles(); + deleteFile(TEMP_DIR); + } + + @Before + public void setup() { + deleteTempFiles(); + FileBackedOutputStream.REFERENCE_CACHE.clear(); + } + + @After + public void cleanup() { + deleteTempFiles(); + } + + @Test + public void testFileThresholdNotReached() throws IOException { + LOG.info("testFileThresholdNotReached starting"); + try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) { + byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}; + fbos.write(bytes[0]); + fbos.write(bytes, 1, bytes.length - 1); + + assertEquals("getCount", bytes.length, fbos.getCount()); + assertNull("Found unexpected temp file", findTempFileName()); + assertEquals("Size", bytes.length, fbos.asByteSource().size()); + + // Read bytes twice. + assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read()); + assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read()); + + assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size()); + + fbos.cleanup(); + } + + LOG.info("testFileThresholdNotReached ending"); + } + + @Test + public void testFileThresholdReachedWithWriteBytes() throws IOException { + LOG.info("testFileThresholdReachedWithWriteBytes starting"); + try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) { + byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}; + fbos.write(bytes[0]); + fbos.write(bytes, 1, 11); + + String tempFileName = findTempFileName(); + assertNotNull("Expected temp file created", tempFileName); + + fbos.write(bytes[12]); + fbos.write(bytes, 13, bytes.length - 13); + + assertEquals("Temp file", tempFileName, findTempFileName()); + assertEquals("Size", bytes.length, fbos.asByteSource().size()); + + InputStream inputStream = fbos.asByteSource().openStream(); + + assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read()); + + byte[] inBytes = new byte[bytes.length]; + assertEquals("# bytes read", bytes.length, inputStream.read(inBytes)); + assertArrayEquals("Read InputStream", bytes, inBytes); + assertEquals("End of stream", -1, inputStream.read()); + + inputStream.close(); + + assertEquals("Reference cache size", 1, FileBackedOutputStream.REFERENCE_CACHE.size()); + + fbos.cleanup(); + + assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size()); + + assertNull("Found unexpected temp file", findTempFileName()); + } + + LOG.info("testFileThresholdReachedWithWriteBytes ending"); + } + + @Test + public void testFileThresholdReachedWithWriteByte() throws IOException { + LOG.info("testFileThresholdReachedWithWriteByte starting"); + try (FileBackedOutputStream fbos = new FileBackedOutputStream(2, TEMP_DIR)) { + byte[] bytes = new byte[]{0, 1, 2}; + fbos.write(bytes[0]); + fbos.write(bytes[1]); + + assertNull("Found unexpected temp file", findTempFileName()); + + fbos.write(bytes[2]); + fbos.flush(); + + assertNotNull("Expected temp file created", findTempFileName()); + + assertEquals("Size", bytes.length, fbos.asByteSource().size()); + assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read()); + } + + LOG.info("testFileThresholdReachedWithWriteByte ending"); + } + + @Test(expected = IOException.class) + public void testWriteAfterAsByteSource() throws IOException { + LOG.info("testWriteAfterAsByteSource starting"); + try (FileBackedOutputStream fbos = new FileBackedOutputStream(3, TEMP_DIR)) { + byte[] bytes = new byte[]{0, 1, 2}; + fbos.write(bytes); + + assertNull("Found unexpected temp file", findTempFileName()); + assertEquals("Size", bytes.length, fbos.asByteSource().size()); + + // Should throw IOException after call to asByteSource. + fbos.write(1); + } + } + + @Test + public void testTempFileDeletedOnGC() throws IOException { + LOG.info("testTempFileDeletedOnGC starting"); + + FileBackedOutputStream fbos = null; + try { + fbos = new FileBackedOutputStream(1, TEMP_DIR); + fbos.write(new byte[] {0, 1}); + assertNotNull("Expected temp file created", findTempFileName()); + } finally { + if (fbos != null) { + fbos.close(); + } + fbos = null; + } + + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 20) { + System.gc(); + if (findTempFileName() == null) { + return; + } + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + fail("Temp file was not deleted"); + } + + private static String findTempFileName() { + String[] files = new File(TEMP_DIR).list(); + assertNotNull(files); + assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2); + return files.length == 1 ? files[0] : null; + } + + private static boolean deleteFile(String file) { + return new File(file).delete(); + } + + private static void deleteTempFiles() { + String[] files = new File(TEMP_DIR).list(); + if (files != null) { + for (String file: files) { + deleteFile(TEMP_DIR + File.separator + file); + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index e889f0a9b3..eeb6ad3155 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -125,6 +125,8 @@ public class DatastoreContext { setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); + setTempFileDirectory(other.getTempFileDirectory()); + setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold()); } public static Builder newBuilder() { @@ -203,6 +205,22 @@ public class DatastoreContext { return shardManagerPersistenceId; } + public String getTempFileDirectory() { + return raftConfig.getTempFileDirectory(); + } + + private void setTempFileDirectory(String tempFileDirectory) { + raftConfig.setTempFileDirectory(tempFileDirectory); + } + + public int getFileBackedStreamingThreshold() { + return raftConfig.getFileBackedStreamingThreshold(); + } + + private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold); + } + private void setPeerAddressResolver(PeerAddressResolver resolver) { raftConfig.setPeerAddressResolver(resolver); } @@ -517,5 +535,15 @@ public class DatastoreContext { datastoreContext.setPeerAddressResolver(resolver); return this; } + + public Builder tempFileDirectory(String tempFileDirectory) { + datastoreContext.setTempFileDirectory(tempFileDirectory); + return this; + } + + public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) { + datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); + return this; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 18f5681386..0847c981ee 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -62,6 +62,9 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute return DatastoreContext.newBuilder() .logicalStoreType(LogicalDatastoreType.CONFIGURATION) + .tempFileDirectory("./data") + .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes() + .getValue().intValue()) .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue()) .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize() .getValue().intValue()) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index ce4e543243..2e97c7ee9a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -63,6 +63,9 @@ public class DistributedOperationalDataStoreProviderModule return DatastoreContext.newBuilder() .logicalStoreType(LogicalDatastoreType.OPERATIONAL) + .tempFileDirectory("./data") + .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes() + .getValue().intValue()) .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue()) .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize() .getValue().intValue()) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 2ae5f18785..d14c828809 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -240,6 +240,14 @@ module distributed-datastore-provider { description "Use a newer protocol between the frontend and backend. This feature is considered exprerimental at this point."; } + + leaf file-backed-streaming-threshold-in-megabytes { + default 128; + type non-zero-uint32-type; + description "When streaming large amounts of data, eg when sending a snapshot to a follower, this + is the threshold in terms of number of megabytes before it should switch from storing in memory to + buffering to a file."; + } } // Augments the 'configuration' choice node under modules/module.