Bug 7521: Add FileBackedOutputStream and use for snapshot chunking 69/50769/13
authorTom Pantelis <tpanteli@brocade.com>
Sun, 22 Jan 2017 05:38:05 +0000 (00:38 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 17 Feb 2017 13:56:30 +0000 (13:56 +0000)
Added a FileBackedOutputStream class, similar to the one in guava except:
  - allows for the temp dir to be confgured
  - keeps track of the count of bytes written for efficiency
  - uses a PhantomReference to delete the temp file instead of using finalize.

FileBackedOutputStream is now used in chunking the snapshop on the leader
side and re-assembling on the follower side.

Change-Id: Ieea4bc1388ffe18d6803783d2bb714089716b7b1
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
15 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index 0e701e3..86ce311 100644 (file)
@@ -24,6 +24,8 @@ import scala.concurrent.duration.FiniteDuration;
  * @author Kamal Rameshan
  */
 public interface ConfigParams {
+    int MEGABYTE = 1048576;
+
     /**
      * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
      *
@@ -114,4 +116,19 @@ public interface ConfigParams {
      */
     String getCustomRaftPolicyImplementationClass();
 
+    /**
+     * Returns the directory in which to create temp files.
+     *
+     * @return the directory in which to create temp files.
+     */
+    @Nonnull
+    String getTempFileDirectory();
+
+    /**
+     * Returns the threshold in terms of number of bytes when streaming data before it should switch from storing in
+     * memory to buffering to a file.
+     *
+     * @return the threshold in terms of number of bytes.
+     */
+    int getFileBackedStreamingThreshold();
 }
index 317ce6a..56fb633 100644 (file)
@@ -68,6 +68,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
 
+    private String tempFileDirectory = "";
+
+    private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
         electionTimeOutInterval = null;
@@ -98,6 +102,14 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         electionTimeOutInterval = null;
     }
 
+    public void setTempFileDirectory(String tempFileDirectory) {
+        this.tempFileDirectory = tempFileDirectory;
+    }
+
+    public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+        this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+    }
+
     public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
         this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
     }
@@ -162,6 +174,16 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return policySupplier.get();
     }
 
+    @Override
+    public String getTempFileDirectory() {
+        return tempFileDirectory;
+    }
+
+    @Override
+    public int getFileBackedStreamingThreshold() {
+        return fileBackedStreamingThreshold;
+    }
+
     private class PolicySupplier implements Supplier<RaftPolicy> {
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
index 74a214f..1d59ed1 100644 (file)
@@ -21,6 +21,7 @@ import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -326,4 +327,12 @@ public interface RaftActorContext {
      * @return the Consumer
      */
     Consumer<ApplyState> getApplyStateConsumer();
+
+    /**
+     * Creates a FileBackedOutputStream with a common configuration.
+     *
+     * @return a FileBackedOutputStream instance
+     */
+    @Nonnull
+    FileBackedOutputStream newFileBackedOutputStream();
 }
index 43a58b9..b307195 100644 (file)
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -399,6 +400,12 @@ public class RaftActorContextImpl implements RaftActorContext {
         return applyStateConsumer;
     }
 
+    @Override
+    public FileBackedOutputStream newFileBackedOutputStream() {
+        return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
+                configParams.getTempFileDirectory());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void close() {
         if (currentBehavior != null) {
index 12508ae..71ef8ff 100644 (file)
@@ -11,13 +11,13 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteSource;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState {
 
             OutputStream installSnapshotStream = null;
             if (targetFollower != null) {
-                installSnapshotStream = new ByteArrayOutputStream();
+                installSnapshotStream = context.newFileBackedOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -410,18 +410,17 @@ public class SnapshotManager implements SnapshotState {
                     context.getReplicatedLog().getSnapshotTerm());
 
             if (installSnapshotStream.isPresent()) {
-                try {
-                    installSnapshotStream.get().close();
-                } catch (IOException e) {
-                    log.warn("Error closing install snapshot OutputStream", e);
-                }
-
                 if (context.getId().equals(currentBehavior.getLeaderId())) {
-                    ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
-                            .toByteArray());
-
-                    // this would be call straight to the leader and won't initiate in serialization
-                    currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+                    try {
+                        ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+                        currentBehavior.handleMessage(context.getActor(),
+                                new SendInstallSnapshot(snapshot, snapshotBytes));
+                    } catch (IOException e) {
+                        log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+                                context.getId(), e);
+                    }
+                } else {
+                    ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
                 }
             }
 
index 6e5495d..30c1264 100644 (file)
@@ -139,7 +139,7 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
             log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
                 + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
-            snapshotTracker = null;
+            closeSnapshotTracker();
         }
 
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
@@ -518,7 +518,8 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = installSnapshot.getLeaderId();
 
         if (snapshotTracker == null) {
-            snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
+            snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId(),
+                    context);
         }
 
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
index 77c9cb5..6b7e037 100644 (file)
@@ -11,10 +11,11 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteSource;
-import com.google.common.io.CountingOutputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.slf4j.Logger;
 
 /**
@@ -24,19 +25,19 @@ class SnapshotTracker implements AutoCloseable {
     private final Logger log;
     private final int totalChunks;
     private final String leaderId;
-    private final CountingOutputStream countingStream;
-    private final ByteArrayOutputStream backingStream;
+    private final BufferedOutputStream bufferedStream;
+    private final FileBackedOutputStream fileBackedStream;
     private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
     private boolean sealed = false;
     private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
+    private long count;
 
-    SnapshotTracker(Logger log, int totalChunks, String leaderId) {
+    SnapshotTracker(Logger log, int totalChunks, String leaderId, RaftActorContext context) {
         this.log = log;
         this.totalChunks = totalChunks;
         this.leaderId = Preconditions.checkNotNull(leaderId);
-
-        backingStream = new ByteArrayOutputStream();
-        countingStream = new CountingOutputStream(backingStream);
+        fileBackedStream = context.newFileBackedOutputStream();
+        bufferedStream = new BufferedOutputStream(fileBackedStream);
     }
 
     /**
@@ -51,7 +52,7 @@ class SnapshotTracker implements AutoCloseable {
     boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> maybeLastChunkHashCode)
             throws InvalidChunkException, IOException {
         log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
-                chunkIndex, lastChunkIndex, countingStream.getCount(), this.lastChunkHashCode);
+                chunkIndex, lastChunkIndex, count, this.lastChunkHashCode);
 
         if (sealed) {
             throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
@@ -68,20 +69,22 @@ class SnapshotTracker implements AutoCloseable {
                     + maybeLastChunkHashCode.get());
         }
 
-        countingStream.write(chunk);
+        bufferedStream.write(chunk);
 
+        count += chunk.length;
         sealed = chunkIndex == totalChunks;
         lastChunkIndex = chunkIndex;
         this.lastChunkHashCode = Arrays.hashCode(chunk);
         return sealed;
     }
 
-    ByteSource getSnapshotBytes() {
+    ByteSource getSnapshotBytes() throws IOException {
         if (!sealed) {
             throw new IllegalStateException("lastChunk not received yet");
         }
 
-        return ByteSource.wrap(backingStream.toByteArray());
+        bufferedStream.close();
+        return fileBackedStream.asByteSource();
     }
 
     String getLeaderId() {
@@ -90,11 +93,7 @@ class SnapshotTracker implements AutoCloseable {
 
     @Override
     public void close() {
-        try {
-            countingStream.close();
-        } catch (IOException e) {
-            log.warn("Error closing snapshot stream");
-        }
+        fileBackedStream.cleanup();
     }
 
     public static class InvalidChunkException extends IOException {
index 7591c2f..3f96485 100644 (file)
@@ -38,6 +38,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -96,6 +97,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
         doReturn("member5").when(mockElectionTerm).getVotedFor();
 
+        doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
+
         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
         factory = new TestActorFactory(getSystem());
 
index 281f807..f52d8e4 100644 (file)
@@ -9,6 +9,9 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import com.google.common.base.Optional;
 import com.google.common.io.ByteSource;
@@ -21,21 +24,29 @@ import java.util.Map;
 import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SnapshotTrackerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotTrackerTest.class);
 
-    Logger logger = LoggerFactory.getLogger(getClass());
-
-    Map<String, String> data;
-    ByteString byteString;
-    byte[] chunk1;
-    byte[] chunk2;
-    byte[] chunk3;
+    @Mock
+    private RaftActorContext mockContext;
+    private FileBackedOutputStream fbos;
+    private Map<String, String> data;
+    private ByteString byteString;
+    private byte[] chunk1;
+    private byte[] chunk2;
+    private byte[] chunk3;
 
     @Before
     public void setup() {
+        MockitoAnnotations.initMocks(this);
+
         data = new HashMap<>();
         data.put("key1", "value1");
         data.put("key2", "value2");
@@ -45,25 +56,28 @@ public class SnapshotTrackerTest {
         chunk1 = getNextChunk(byteString, 0, 10);
         chunk2 = getNextChunk(byteString, 10, 10);
         chunk3 = getNextChunk(byteString, 20, byteString.size());
+
+        fbos = spy(new FileBackedOutputStream(100000000, "target"));
+        doReturn(fbos).when(mockContext).newFileBackedOutputStream();
     }
 
     @Test
     public void testAddChunks() throws IOException {
-        SnapshotTracker tracker = new SnapshotTracker(logger, 3, "leader");
-
-        tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
-        tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
-        tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 3, "leader", mockContext)) {
+            tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
+            tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+            tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
 
-        ByteSource snapshotBytes = tracker.getSnapshotBytes();
-        assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read()));
+            ByteSource snapshotBytes = tracker.getSnapshotBytes();
+            assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read()));
+        }
 
-        tracker.close();
+        verify(fbos).cleanup();
     }
 
     @Test(expected = SnapshotTracker.InvalidChunkException.class)
     public void testAddChunkWhenAlreadySealed() throws IOException {
-        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
             tracker.addChunk(1, chunk1, Optional.<Integer>absent());
             tracker.addChunk(2, chunk2, Optional.<Integer>absent());
             tracker.addChunk(3, chunk3, Optional.<Integer>absent());
@@ -72,14 +86,14 @@ public class SnapshotTrackerTest {
 
     @Test(expected = SnapshotTracker.InvalidChunkException.class)
     public void testInvalidFirstChunkIndex() throws IOException {
-        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
             tracker.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
         }
     }
 
     @Test(expected = SnapshotTracker.InvalidChunkException.class)
     public void testOutOfSequenceChunk() throws IOException {
-        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
             tracker.addChunk(1, chunk1, Optional.<Integer>absent());
             tracker.addChunk(3, chunk3, Optional.<Integer>absent());
         }
@@ -87,7 +101,7 @@ public class SnapshotTrackerTest {
 
     @Test(expected = SnapshotTracker.InvalidChunkException.class)
     public void testInvalidLastChunkHashCode() throws IOException {
-        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
             tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
             tracker.addChunk(2, chunk2, Optional.of(1));
         }
@@ -95,7 +109,7 @@ public class SnapshotTrackerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testGetSnapshotBytesWhenNotSealed() throws IOException {
-        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+        try (SnapshotTracker tracker = new SnapshotTracker(LOG, 2, "leader", mockContext)) {
             tracker.addChunk(1, chunk1, Optional.<Integer>absent());
             tracker.getSnapshotBytes();
         }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStream.java
new file mode 100644 (file)
index 0000000..452611b
--- /dev/null
@@ -0,0 +1,281 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering once the data
+ * reaches a configurable size. This class is thread-safe.
+ *
+ * @author Thomas Pantelis
+ */
+@ThreadSafe
+public class FileBackedOutputStream extends OutputStream {
+    private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStream.class);
+
+    /**
+     * This stores the Cleanup PhantomReference instances statically. This is necessary because PhantomReferences
+     * need a hard reference so they're not garbage collected. Once finalized, the Cleanup PhantomReference removes
+     * itself from this map and thus becomes eligible for garbage collection.
+     */
+    @VisibleForTesting
+    static final Set<Cleanup> REFERENCE_CACHE = Sets.newConcurrentHashSet();
+
+    /**
+     * Used as the ReferenceQueue for the Cleanup PhantomReferences.
+     */
+    private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
+
+    private final int fileThreshold;
+    private final String fileDirectory;
+
+    @GuardedBy("this")
+    private MemoryOutputStream memory = new MemoryOutputStream();
+
+    @GuardedBy("this")
+    private OutputStream out = memory;
+
+    @GuardedBy("this")
+    private File file;
+
+    @GuardedBy("this")
+    private ByteSource source;
+
+    private volatile long count;
+
+    /**
+     * Creates a new instance that uses the given file threshold, and does not reset the data when the
+     * {@link ByteSource} returned by {@link #asByteSource} is finalized.
+     *
+     * @param fileThreshold the number of bytes before the stream should switch to buffering to a file
+     * @param fileDirectory the directory in which to create the file if needed. If null, the default temp file
+     *                      location is used.
+     */
+    public FileBackedOutputStream(int fileThreshold, @Nullable String fileDirectory) {
+        this.fileThreshold = fileThreshold;
+        this.fileDirectory = fileDirectory;
+    }
+
+    /**
+     * Returns a readable {@link ByteSource} view of the data that has been written to this stream. This stream is
+     * closed and further attempts to write to it will result in an IOException.
+     *
+     * @return a ByteSource instance
+     * @throws IOException if close fails
+     */
+    @Nonnull
+    public synchronized ByteSource asByteSource() throws IOException {
+        close();
+
+        if (source == null) {
+            source = new ByteSource() {
+                @Override
+                public InputStream openStream() throws IOException {
+                    synchronized (FileBackedOutputStream.this) {
+                        if (file != null) {
+                            return new FileInputStream(file);
+                        } else {
+                            return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
+                        }
+                    }
+                }
+
+                @Override
+                public long size() {
+                    return count;
+                }
+            };
+        }
+
+        return source;
+    }
+
+    @Override
+    @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Findbugs erroneously complains that the "
+        + "increment of count needs to be atomic even though it is inside a synchronized block.")
+    public synchronized void write(int value) throws IOException {
+        possiblySwitchToFile(1);
+        out.write(value);
+        count++;
+    }
+
+    @Override
+    public synchronized void write(byte[] bytes) throws IOException {
+        write(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public synchronized void write(byte[] bytes, int off, int len) throws IOException {
+        possiblySwitchToFile(len);
+        out.write(bytes, off, len);
+        count += len;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (out != null) {
+            OutputStream closeMe = out;
+            out = null;
+            closeMe.close();
+        }
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        if (out != null) {
+            out.flush();
+        }
+    }
+
+    public synchronized long getCount() {
+        return count;
+    }
+
+    /**
+     * Calls {@link #close} if not already closed and, if data was buffered to a file, deletes the file.
+     */
+    public synchronized void cleanup() {
+        LOG.debug("In cleanup");
+
+        closeQuietly();
+
+        if (file != null) {
+            Iterator<Cleanup> iter = REFERENCE_CACHE.iterator();
+            while (iter.hasNext()) {
+                if (file.equals(iter.next().file)) {
+                    iter.remove();
+                    break;
+                }
+            }
+
+            LOG.debug("cleanup - deleting temp file {}", file);
+
+            deleteFile(file);
+            file = null;
+        }
+    }
+
+    @GuardedBy("this")
+    private void closeQuietly() {
+        try {
+            close();
+        } catch (IOException e) {
+            LOG.warn("Error closing output stream {}", out, e);
+        }
+    }
+
+    /**
+     * Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if so.
+     */
+    @GuardedBy("this")
+    private void possiblySwitchToFile(int len) throws IOException {
+        if (out == null) {
+            throw new IOException("Stream already closed");
+        }
+
+        if (file == null && memory.getCount() + len > fileThreshold) {
+            File temp = File.createTempFile("FileBackedOutputStream", null, new File(fileDirectory));
+            temp.deleteOnExit();
+
+            LOG.debug("Byte count {} has exceeded threshold {} - switching to file: {}", memory.getCount() + len,
+                    fileThreshold, temp);
+
+            FileOutputStream transfer = null;
+            try {
+                transfer = new FileOutputStream(temp);
+                transfer.write(memory.getBuffer(), 0, memory.getCount());
+                transfer.flush();
+
+                // We've successfully transferred the data; switch to writing to file
+                out = transfer;
+                file = temp;
+                memory = null;
+
+                new Cleanup(this, file);
+            } catch (IOException e) {
+                if (transfer != null) {
+                    try {
+                        transfer.close();
+                    } catch (IOException ex) {
+                        LOG.debug("Error closing temp file {}", temp, ex);
+                    }
+                }
+
+                deleteFile(temp);
+                throw e;
+            }
+        }
+    }
+
+    private static void deleteFile(File file) {
+        if (!file.delete()) {
+            LOG.warn("Could not delete temp file {}", file);
+        }
+    }
+
+    /**
+     * ByteArrayOutputStream that exposes its internals for efficiency.
+     */
+    private static class MemoryOutputStream extends ByteArrayOutputStream {
+        byte[] getBuffer() {
+            return buf;
+        }
+
+        int getCount() {
+            return count;
+        }
+    }
+
+    /**
+     * PhantomReference that deletes the temp file when the FileBackedOutputStream is garbage collected.
+     */
+    private static class Cleanup extends FinalizablePhantomReference<FileBackedOutputStream> {
+        private final File file;
+
+        Cleanup(FileBackedOutputStream referent, File file) {
+            super(referent, REFERENCE_QUEUE);
+            this.file = file;
+
+            REFERENCE_CACHE.add(this);
+
+            LOG.debug("Added Cleanup for temp file {}", file);
+        }
+
+        @Override
+        public void finalizeReferent() {
+            LOG.debug("In finalizeReferent");
+
+            if (REFERENCE_CACHE.remove(this)) {
+                LOG.debug("finalizeReferent - deleting temp file {}", file);
+                deleteFile(file);
+            }
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java
new file mode 100644 (file)
index 0000000..bcb1850
--- /dev/null
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for FileBackedOutputStream.
+ *
+ * @author Thomas Pantelis
+ */
+public class FileBackedOutputStreamTest {
+    private static final Logger LOG = LoggerFactory.getLogger(FileBackedOutputStreamTest.class);
+    private static final String TEMP_DIR = "target/FileBackedOutputStreamTest";
+
+    @BeforeClass
+    public static void staticSetup() {
+        File dir = new File(TEMP_DIR);
+        if (!dir.exists() && !dir.mkdirs()) {
+            throw new RuntimeException("Failed to create temp dir " + TEMP_DIR);
+        }
+    }
+
+    @AfterClass
+    public static void staticCleanup() {
+        deleteTempFiles();
+        deleteFile(TEMP_DIR);
+    }
+
+    @Before
+    public void setup() {
+        deleteTempFiles();
+        FileBackedOutputStream.REFERENCE_CACHE.clear();
+    }
+
+    @After
+    public void cleanup() {
+        deleteTempFiles();
+    }
+
+    @Test
+    public void testFileThresholdNotReached() throws IOException {
+        LOG.info("testFileThresholdNotReached starting");
+        try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) {
+            byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9};
+            fbos.write(bytes[0]);
+            fbos.write(bytes, 1, bytes.length - 1);
+
+            assertEquals("getCount", bytes.length, fbos.getCount());
+            assertNull("Found unexpected temp file", findTempFileName());
+            assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+            // Read bytes twice.
+            assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+            assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+
+            assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+            fbos.cleanup();
+        }
+
+        LOG.info("testFileThresholdNotReached ending");
+    }
+
+    @Test
+    public void testFileThresholdReachedWithWriteBytes() throws IOException {
+        LOG.info("testFileThresholdReachedWithWriteBytes starting");
+        try (FileBackedOutputStream fbos = new FileBackedOutputStream(10, TEMP_DIR)) {
+            byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14};
+            fbos.write(bytes[0]);
+            fbos.write(bytes, 1, 11);
+
+            String tempFileName = findTempFileName();
+            assertNotNull("Expected temp file created", tempFileName);
+
+            fbos.write(bytes[12]);
+            fbos.write(bytes, 13, bytes.length - 13);
+
+            assertEquals("Temp file", tempFileName, findTempFileName());
+            assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+            InputStream inputStream = fbos.asByteSource().openStream();
+
+            assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+
+            byte[] inBytes = new byte[bytes.length];
+            assertEquals("# bytes read", bytes.length, inputStream.read(inBytes));
+            assertArrayEquals("Read InputStream", bytes, inBytes);
+            assertEquals("End of stream", -1, inputStream.read());
+
+            inputStream.close();
+
+            assertEquals("Reference cache size", 1, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+            fbos.cleanup();
+
+            assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
+
+            assertNull("Found unexpected temp file", findTempFileName());
+        }
+
+        LOG.info("testFileThresholdReachedWithWriteBytes ending");
+    }
+
+    @Test
+    public void testFileThresholdReachedWithWriteByte() throws IOException {
+        LOG.info("testFileThresholdReachedWithWriteByte starting");
+        try (FileBackedOutputStream fbos = new FileBackedOutputStream(2, TEMP_DIR)) {
+            byte[] bytes = new byte[]{0, 1, 2};
+            fbos.write(bytes[0]);
+            fbos.write(bytes[1]);
+
+            assertNull("Found unexpected temp file", findTempFileName());
+
+            fbos.write(bytes[2]);
+            fbos.flush();
+
+            assertNotNull("Expected temp file created", findTempFileName());
+
+            assertEquals("Size", bytes.length, fbos.asByteSource().size());
+            assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
+        }
+
+        LOG.info("testFileThresholdReachedWithWriteByte ending");
+    }
+
+    @Test(expected = IOException.class)
+    public void testWriteAfterAsByteSource() throws IOException {
+        LOG.info("testWriteAfterAsByteSource starting");
+        try (FileBackedOutputStream fbos = new FileBackedOutputStream(3, TEMP_DIR)) {
+            byte[] bytes = new byte[]{0, 1, 2};
+            fbos.write(bytes);
+
+            assertNull("Found unexpected temp file", findTempFileName());
+            assertEquals("Size", bytes.length, fbos.asByteSource().size());
+
+            // Should throw IOException after call to asByteSource.
+            fbos.write(1);
+        }
+    }
+
+    @Test
+    public void testTempFileDeletedOnGC() throws IOException {
+        LOG.info("testTempFileDeletedOnGC starting");
+
+        FileBackedOutputStream fbos = null;
+        try {
+            fbos = new FileBackedOutputStream(1, TEMP_DIR);
+            fbos.write(new byte[] {0, 1});
+            assertNotNull("Expected temp file created", findTempFileName());
+        } finally {
+            if (fbos != null) {
+                fbos.close();
+            }
+            fbos = null;
+        }
+
+        Stopwatch sw = Stopwatch.createStarted();
+        while (sw.elapsed(TimeUnit.SECONDS) <= 20) {
+            System.gc();
+            if (findTempFileName() == null) {
+                return;
+            }
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Temp file was not deleted");
+    }
+
+    private static String findTempFileName() {
+        String[] files = new File(TEMP_DIR).list();
+        assertNotNull(files);
+        assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2);
+        return files.length == 1 ? files[0] : null;
+    }
+
+    private static boolean deleteFile(String file) {
+        return new File(file).delete();
+    }
+
+    private static void deleteTempFiles() {
+        String[] files = new File(TEMP_DIR).list();
+        if (files != null) {
+            for (String file: files) {
+                deleteFile(TEMP_DIR + File.separator + file);
+            }
+        }
+    }
+}
index e889f0a..eeb6ad3 100644 (file)
@@ -125,6 +125,8 @@ public class DatastoreContext {
         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
+        setTempFileDirectory(other.getTempFileDirectory());
+        setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
     }
 
     public static Builder newBuilder() {
@@ -203,6 +205,22 @@ public class DatastoreContext {
         return shardManagerPersistenceId;
     }
 
+    public String getTempFileDirectory() {
+        return raftConfig.getTempFileDirectory();
+    }
+
+    private void setTempFileDirectory(String tempFileDirectory) {
+        raftConfig.setTempFileDirectory(tempFileDirectory);
+    }
+
+    public int getFileBackedStreamingThreshold() {
+        return raftConfig.getFileBackedStreamingThreshold();
+    }
+
+    private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+        raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
+    }
+
     private void setPeerAddressResolver(PeerAddressResolver resolver) {
         raftConfig.setPeerAddressResolver(resolver);
     }
@@ -517,5 +535,15 @@ public class DatastoreContext {
             datastoreContext.setPeerAddressResolver(resolver);
             return this;
         }
+
+        public Builder tempFileDirectory(String tempFileDirectory) {
+            datastoreContext.setTempFileDirectory(tempFileDirectory);
+            return this;
+        }
+
+        public Builder fileBackedStreamingThresholdInMegabytes(int  fileBackedStreamingThreshold) {
+            datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+            return this;
+        }
     }
 }
index 18f5681..0847c98 100644 (file)
@@ -62,6 +62,9 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
 
         return DatastoreContext.newBuilder()
                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+                .tempFileDirectory("./data")
+                .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes()
+                        .getValue().intValue())
                 .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
                 .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
                         .getValue().intValue())
index ce4e543..2e97c7e 100644 (file)
@@ -63,6 +63,9 @@ public class DistributedOperationalDataStoreProviderModule
 
         return DatastoreContext.newBuilder()
                 .logicalStoreType(LogicalDatastoreType.OPERATIONAL)
+                .tempFileDirectory("./data")
+                .fileBackedStreamingThresholdInMegabytes(props.getFileBackedStreamingThresholdInMegabytes()
+                        .getValue().intValue())
                 .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
                 .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
                         .getValue().intValue())
index 2ae5f18..d14c828 100644 (file)
@@ -240,6 +240,14 @@ module distributed-datastore-provider {
             description "Use a newer protocol between the frontend and backend. This feature is considered
                          exprerimental at this point.";
         }
+
+        leaf file-backed-streaming-threshold-in-megabytes {
+            default 128;
+            type non-zero-uint32-type;
+            description "When streaming large amounts of data, eg when sending a snapshot to a follower, this
+                is the threshold in terms of number of megabytes before it should switch from storing in memory to
+                buffering to a file.";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.