Bug 2890: Chunk AppendEntries when single payload size exceeds threshold 01/57301/12
authorTom Pantelis <tompantelis@gmail.com>
Thu, 18 May 2017 13:23:04 +0000 (09:23 -0400)
committerRobert Varga <nite@hq.sk>
Thu, 29 Jun 2017 15:48:09 +0000 (15:48 +0000)
Utilizes the MessageSlicer in AbstractLeader to slice/chunk AppendEntries
messages whose single log entry payloas exceeds the max size threshold.
The MessageAssembler is used in the Follower to re-assemble.

For efficiency, with multiple followers, the AbstractLeader reuses the
FileBackedOutputStream containing the serialized AppendEntries data.
However, since the MessageSlicer takes ownership of the FileBackedOutputStream
and cleans it up when slicing is complete, I added a
SharedFileBackedOutputStream class that maintains a usage count and
performs cleanup when the usage count reaches 0. The AbstractLeader maintains
a Map of SharedFileBackedOutputStream instances keyed by log index.

The FollowerLogInformation keeps track of whether or not slicing is in
progress for the follower. Same as with install snapshot, we only want to send
empty AppendEntries as heartbeats.

Change-Id: Id163944b9989f6cb39a6aaaa98d1f3c4b0026bbe
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java [new file with mode: 0644]

index 123d206..6f9efbb 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotS
  * The state of the followers log as known by the Leader.
  */
 public interface FollowerLogInformation {
+    long NO_INDEX = -1;
 
     /**
      * Increments the value of the follower's next index.
@@ -160,4 +161,19 @@ public interface FollowerLogInformation {
      * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
      */
     void clearLeaderInstallSnapshotState();
+
+    /**
+     * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
+     * needs to be sliced into smaller chunks.
+     *
+     * @param index the log entry index or NO_INDEX to clear it
+     */
+    void setSlicedLogEntryIndex(long index);
+
+    /**
+     * Return whether or not log entry slicing is currently in progress.
+     *
+     * @return true if slicing is currently in progress, false otherwise
+     */
+    boolean isLogEntrySlicingInProgress();
 }
index 883bfbb..8bd0cf3 100644 (file)
@@ -46,6 +46,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private LeaderInstallSnapshotState installSnapshotState;
 
+    private long slicedLogEntryIndex = NO_INDEX;
+
     /**
      * Constructs an instance.
      *
@@ -92,6 +94,12 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean setMatchIndex(long matchIndex) {
+        // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
+        // and the follower received the entry and responded so clear the slicedLogEntryIndex
+        if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
+            slicedLogEntryIndex = NO_INDEX;
+        }
+
         if (this.matchIndex != matchIndex) {
             this.matchIndex = matchIndex;
             return true;
@@ -210,6 +218,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         installSnapshotState = null;
     }
 
+    @Override
+    public void setSlicedLogEntryIndex(long index) {
+        slicedLogEntryIndex  = index;
+    }
+
+    @Override
+    public boolean isLogEntrySlicingInProgress() {
+        return slicedLogEntryIndex != NO_INDEX;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 0fe6cf1..aed050c 100644 (file)
@@ -21,7 +21,7 @@ import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -329,12 +329,12 @@ public interface RaftActorContext {
     Consumer<ApplyState> getApplyStateConsumer();
 
     /**
-     * Creates a FileBackedOutputStream with a common configuration.
+     * Returns the {@link FileBackedOutputStreamFactory} instance with a common configuration.
      *
-     * @return a FileBackedOutputStream instance
+     * @return the {@link FileBackedOutputStreamFactory};
      */
     @Nonnull
-    FileBackedOutputStream newFileBackedOutputStream();
+    FileBackedOutputStreamFactory getFileBackedOutputStreamFactory();
 
     /**
      * Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress.
index 5b130db..4fd5666 100644 (file)
@@ -29,7 +29,7 @@ import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -89,6 +89,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Consumer<ApplyState> applyStateConsumer;
 
+    private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
+
     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
 
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
@@ -107,6 +109,9 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.log = Preconditions.checkNotNull(logger);
         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
 
+        fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
+                configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
+
         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
@@ -404,9 +409,8 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    public FileBackedOutputStream newFileBackedOutputStream() {
-        return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
-                configParams.getTempFileDirectory());
+    public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
+        return fileBackedOutputStreamFactory;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
index 71ef8ff..7ff47f9 100644 (file)
@@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState {
 
             OutputStream installSnapshotStream = null;
             if (targetFollower != null) {
-                installSnapshotStream = context.newFileBackedOutputStream();
+                installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
index 097f0ec..9fc7b33 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -85,6 +90,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    /**
+     * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+     * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+     * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+     */
+    private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+    private final MessageSlicer appendEntriesMessageSlicer;
+
     private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
@@ -93,6 +106,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
+        appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+            .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+            .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+                    TimeUnit.MILLISECONDS).build();
+
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
             snapshotHolder = initializeFromLeader.snapshotHolder;
@@ -438,6 +456,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        if (appendEntriesMessageSlicer.handleMessage(message)) {
+            return this;
+        }
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -644,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
                 }
+            } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+                sendAppendEntries = sendHeartbeat;
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
@@ -660,11 +684,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     if (followerLogInformation.okToReplicate()) {
-                        // Try to send all the entries in the journal but not exceeding the max data size
-                        // for a single AppendEntries message.
-                        int maxEntries = (int) context.getReplicatedLog().size();
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
-                                context.getConfigParams().getSnapshotChunkSize());
+                        entries = getEntriesToSend(followerLogInformation, followerActor);
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0
@@ -705,6 +725,76 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    private List<ReplicatedLogEntry> getEntriesToSend(FollowerLogInformation followerLogInfo,
+            ActorSelection followerActor) {
+        // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+        // message.
+        int maxEntries = (int) context.getReplicatedLog().size();
+        final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+        final long followerNextIndex = followerLogInfo.getNextIndex();
+        List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+                maxEntries, maxDataSize);
+
+        // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+        // that is the case, then we need to slice it into smaller chunks.
+        if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+            // Don't need to slice.
+            return entries;
+        }
+
+        log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+                maxDataSize);
+
+        // If an AppendEntries has already been serialized for the log index then reuse the
+        // SharedFileBackedOutputStream.
+        final Long logIndex = entries.get(0).getIndex();
+        SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+        if (fileBackedStream == null) {
+            fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+            final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+                    getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+                    context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+            log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+                    followerLogInfo.getId());
+
+            try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+                out.writeObject(appendEntries);
+            } catch (IOException e) {
+                log.error("{}: Error serializing {}", logName(), appendEntries, e);
+                fileBackedStream.cleanup();
+                return Collections.emptyList();
+            }
+
+            sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+            fileBackedStream.setOnCleanupCallback(index -> {
+                log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+                sharedSerializedAppendEntriesStreams.remove(index);
+            }, logIndex);
+        } else {
+            log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+            fileBackedStream.incrementUsageCount();
+        }
+
+        log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+        // Record that slicing is in progress for the follower.
+        followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+        final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+        appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+                .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+                .onFailureCallback(failure -> {
+                    log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+                            followerLogInfo.getId(), failure);
+                    followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+                }).build());
+
+        return Collections.emptyList();
+    }
+
     private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
             FollowerLogInformation followerLogInformation) {
         // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
@@ -714,9 +804,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         //       empty AppendEntries as a heart beat to prevent election.
         //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
         //       need to send AppendEntries to prevent election.
+        //     - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+        //       need to send an empty AppendEntries to prevent election.
         boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
-        long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
-            context.getCommitIndex();
+        long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+                || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
 
         long followerNextIndex = followerLogInformation.getNextIndex();
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
@@ -858,6 +950,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());
             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+
+            appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
         }
     }
 
@@ -889,6 +983,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     @Override
     public void close() {
         stopHeartBeat();
+        appendEntriesMessageSlicer.close();
     }
 
     @Override
index 1587719..9f0e032 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -56,6 +57,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
+    private final MessageAssembler appendEntriesMessageAssembler;
+
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -74,6 +77,10 @@ public class Follower extends AbstractRaftActorBehavior {
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
             .getSyncIndexThreshold());
 
+        appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
+                .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+
         if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
             actor().tell(TimeoutNow.INSTANCE, actor());
         } else {
@@ -315,6 +322,8 @@ public class Follower extends AbstractRaftActorBehavior {
             super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
+        appendEntriesMessageAssembler.checkExpiredAssembledMessageState();
+
         return this;
     }
 
@@ -396,6 +405,10 @@ public class Follower extends AbstractRaftActorBehavior {
             return handleElectionTimeout(message);
         }
 
+        if (appendEntriesMessageAssembler.handleMessage(message, actor())) {
+            return this;
+        }
+
         if (!(message instanceof RaftRPC)) {
             // The rest of the processing requires the message to be a RaftRPC
             return null;
@@ -596,6 +609,7 @@ public class Follower extends AbstractRaftActorBehavior {
     public void close() {
         closeSnapshotTracker();
         stopElection();
+        appendEntriesMessageAssembler.close();
     }
 
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java
new file mode 100644 (file)
index 0000000..32c6da4
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
+
+/**
+ * An Identifier for a follower.
+ *
+ * @author Thomas Pantelis
+ */
+class FollowerIdentifier extends AbstractStringIdentifier<FollowerIdentifier> {
+    private static final long serialVersionUID = 1L;
+
+    FollowerIdentifier(String followerId) {
+        super(followerId);
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private FollowerIdentifier identifier;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(FollowerIdentifier identifier) {
+            this.identifier = identifier;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(identifier.getValue());
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            identifier = new FollowerIdentifier((String) in.readObject());
+        }
+
+        private Object readResolve() {
+            return identifier;
+        }
+    }
+}
index 6b7e037..f8969fc 100644 (file)
@@ -36,7 +36,7 @@ class SnapshotTracker implements AutoCloseable {
         this.log = log;
         this.totalChunks = totalChunks;
         this.leaderId = Preconditions.checkNotNull(leaderId);
-        fileBackedStream = context.newFileBackedOutputStream();
+        fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance();
         bufferedStream = new BufferedOutputStream(fileBackedStream);
     }
 
index 82c049d..e9fc53a 100644 (file)
@@ -107,6 +107,29 @@ public final class ServerConfigurationPayload extends Payload implements Persist
         return serializedSize;
     }
 
+    @Override
+    public int hashCode() {
+        return serverConfig.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        ServerConfigurationPayload other = (ServerConfigurationPayload) obj;
+        return serverConfig.equals(other.serverConfig);
+    }
+
     @Override
     public String toString() {
         return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]";
index 5959df7..c50a108 100644 (file)
@@ -216,6 +216,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long currentTerm;
 
     protected int snapshotBatchCount = 4;
+    protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
 
     protected List<MockPayload> expSnapshotState = new ArrayList<>();
 
@@ -233,7 +234,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
-        configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
+        configParams.setSnapshotChunkSize(snapshotChunkSize);
         return configParams;
     }
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java
new file mode 100644 (file)
index 0000000..ea6de41
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+
+/**
+ * Tests end-to-end replication of sliced log entry payloads, ie entries whose size exceeds the maximum size for a
+ * single AppendEntries message.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicationWithSlicedPayloadIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    @Test
+    public void runTest() throws Exception {
+        testLog.info("ReplicationWithSlicedPayloadIntegrationTest starting");
+
+        // Create the leader and 2 follower actors.
+
+        snapshotChunkSize = 20;
+
+        DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+        followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower2Id, testActorPath(follower2Id)), followerConfigParams);
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+        peerAddresses = ImmutableMap.<String, String>builder()
+                .put(follower1Id, follower1Actor.path().toString())
+                .put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+        waitUntilLeader(leaderActor);
+
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        // Send a large payload that exceeds the size threshold and needs to be sliced.
+
+        MockPayload largePayload = sendPayloadData(leaderActor, "large", snapshotChunkSize + 1);
+
+        // Then send a small payload that does not need to be sliced.
+
+        MockPayload smallPayload = sendPayloadData(leaderActor, "normal", snapshotChunkSize - 1);
+
+        final List<ApplyState> leaderApplyState = expectMatching(leaderCollectorActor, ApplyState.class, 2);
+        verifyApplyState(leaderApplyState.get(0), leaderCollectorActor,
+                largePayload.toString(), currentTerm, 0, largePayload);
+        verifyApplyState(leaderApplyState.get(1), leaderCollectorActor,
+                smallPayload.toString(), currentTerm, 1, smallPayload);
+
+        final List<ApplyState> follower1ApplyState = expectMatching(follower1CollectorActor, ApplyState.class, 2);
+        verifyApplyState(follower1ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+        verifyApplyState(follower1ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+        final List<ApplyState> follower2ApplyState = expectMatching(follower2CollectorActor, ApplyState.class, 2);
+        verifyApplyState(follower2ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+        verifyApplyState(follower2ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+        testLog.info("ReplicationWithSlicedPayloadIntegrationTest ending");
+    }
+}
index 3f96485..fdfaa04 100644 (file)
@@ -38,7 +38,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -97,7 +97,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
         doReturn("member5").when(mockElectionTerm).getVotedFor();
 
-        doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
+        doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+                .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
 
         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
         factory = new TestActorFactory(getSystem());
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java
new file mode 100644 (file)
index 0000000..8f0b613
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for FollowerIdentifier.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerIdentifierTest {
+
+    @Test
+    public void testSerialization() throws FileNotFoundException, IOException {
+        FollowerIdentifier expected = new FollowerIdentifier("follower1");
+        FollowerIdentifier cloned = (FollowerIdentifier) SerializationUtils.clone(expected);
+        assertEquals("cloned", expected, cloned);
+    }
+}
index a8ecf71..930c196 100644 (file)
@@ -39,8 +39,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -70,6 +73,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -160,7 +164,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
@@ -2230,6 +2237,145 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
     }
 
+    @Test
+    public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+        logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+        final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+                Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+                        new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+        final MockRaftActorContext.MockPayload largePayload =
+                new MockRaftActorContext.MockPayload("large", serializedSize);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(300, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send normal payload first to prime commit index.
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        sendReplicate(leaderActorContext, term, 0);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+        assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+        sendReplicate(leaderActorContext, term, 1, largePayload);
+
+        MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+        assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+        final Identifier slicingId = messageSlice.getIdentifier();
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+        messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send another normal payload.
+
+        sendReplicate(leaderActorContext, term, 2);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+    }
+
+    @Test
+    public void testLargePayloadSlicingExpiration() {
+        logStart("testLargePayloadSlicingExpiration");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+                leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+        // Sleep for at least 3 * election timeout so the slicing state expires.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+        MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send an AppendEntriesReply - this should restart the slicing.
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
index f52d8e4..658e2ca 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -27,6 +28,7 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +60,9 @@ public class SnapshotTrackerTest {
         chunk3 = getNextChunk(byteString, 20, byteString.size());
 
         fbos = spy(new FileBackedOutputStream(100000000, "target"));
-        doReturn(fbos).when(mockContext).newFileBackedOutputStream();
+        FileBackedOutputStreamFactory mockFactory = mock(FileBackedOutputStreamFactory.class);
+        doReturn(fbos).when(mockFactory).newInstance();
+        doReturn(mockFactory).when(mockContext).getFileBackedOutputStreamFactory();
     }
 
     @Test
index 0cd4be6..7594883 100644 (file)
@@ -39,4 +39,13 @@ public class FileBackedOutputStreamFactory {
     public FileBackedOutputStream newInstance() {
         return new FileBackedOutputStream(fileThreshold, fileDirectory);
     }
+
+    /**
+     * Creates a new {@link SharedFileBackedOutputStream} with the settings configured for this factory.
+     *
+     * @return a {@link SharedFileBackedOutputStream} instance
+     */
+    public SharedFileBackedOutputStream newSharedInstance() {
+        return new SharedFileBackedOutputStream(fileThreshold, fileDirectory);
+    }
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java
new file mode 100644 (file)
index 0000000..852473b
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * A FileBackedOutputStream that allows for sharing in that it maintains a usage count and the backing file isn't
+ * deleted until the usage count reaches 0. The usage count is initialized to 1 on construction. Subsequent users of
+ * the instance must call {@link #incrementUsageCount()}. The {@link #cleanup()} method decrements the usage count and,
+ * when it reaches 0, the {@link FileBackedOutputStream#cleanup()} is called to delete the backing file.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStream extends FileBackedOutputStream {
+    private final AtomicInteger usageCount = new AtomicInteger(1);
+    @SuppressWarnings("rawtypes")
+    private Consumer onCleanupCallback;
+    private Object onCleanupContext;
+
+    public SharedFileBackedOutputStream(int fileThreshold, String fileDirectory) {
+        super(fileThreshold, fileDirectory);
+    }
+
+    /**
+     * Increments the usage count. This must be followed by a corresponding call to {@link #cleanup()} when this
+     * instance is no longer needed.
+     */
+    public void incrementUsageCount() {
+        usageCount.getAndIncrement();
+    }
+
+    /**
+     * Returns the current usage count.
+     *
+     * @return the current usage count
+     */
+    public int getUsageCount() {
+        return usageCount.get();
+    }
+
+    /**
+     * Sets the callback to be notified when {@link FileBackedOutputStream#cleanup()} is called to delete the backing
+     * file.
+     */
+    public <T> void setOnCleanupCallback(Consumer<T> callback, T context) {
+        onCleanupCallback = callback;
+        onCleanupContext = context;
+    }
+
+    /**
+     * Overridden to decrement the usage count.
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cleanup() {
+        Preconditions.checkState(usageCount.get() > 0);
+
+        if (usageCount.decrementAndGet() == 0) {
+            super.cleanup();
+
+            if (onCleanupCallback != null) {
+                onCleanupCallback.accept(onCleanupContext);
+            }
+        }
+    }
+}
index bcb1850..a7abf7c 100644 (file)
@@ -40,27 +40,24 @@ public class FileBackedOutputStreamTest {
 
     @BeforeClass
     public static void staticSetup() {
-        File dir = new File(TEMP_DIR);
-        if (!dir.exists() && !dir.mkdirs()) {
-            throw new RuntimeException("Failed to create temp dir " + TEMP_DIR);
-        }
+        createDir(TEMP_DIR);
     }
 
     @AfterClass
     public static void staticCleanup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
         deleteFile(TEMP_DIR);
     }
 
     @Before
     public void setup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
         FileBackedOutputStream.REFERENCE_CACHE.clear();
     }
 
     @After
     public void cleanup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
     }
 
     @Test
@@ -72,7 +69,7 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes, 1, bytes.length - 1);
 
             assertEquals("getCount", bytes.length, fbos.getCount());
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             // Read bytes twice.
@@ -95,13 +92,13 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes[0]);
             fbos.write(bytes, 1, 11);
 
-            String tempFileName = findTempFileName();
+            String tempFileName = findTempFileName(TEMP_DIR);
             assertNotNull("Expected temp file created", tempFileName);
 
             fbos.write(bytes[12]);
             fbos.write(bytes, 13, bytes.length - 13);
 
-            assertEquals("Temp file", tempFileName, findTempFileName());
+            assertEquals("Temp file", tempFileName, findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             InputStream inputStream = fbos.asByteSource().openStream();
@@ -121,7 +118,7 @@ public class FileBackedOutputStreamTest {
 
             assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
         }
 
         LOG.info("testFileThresholdReachedWithWriteBytes ending");
@@ -135,12 +132,12 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes[0]);
             fbos.write(bytes[1]);
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
 
             fbos.write(bytes[2]);
             fbos.flush();
 
-            assertNotNull("Expected temp file created", findTempFileName());
+            assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
 
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
             assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
@@ -156,7 +153,7 @@ public class FileBackedOutputStreamTest {
             byte[] bytes = new byte[]{0, 1, 2};
             fbos.write(bytes);
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             // Should throw IOException after call to asByteSource.
@@ -172,7 +169,7 @@ public class FileBackedOutputStreamTest {
         try {
             fbos = new FileBackedOutputStream(1, TEMP_DIR);
             fbos.write(new byte[] {0, 1});
-            assertNotNull("Expected temp file created", findTempFileName());
+            assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
         } finally {
             if (fbos != null) {
                 fbos.close();
@@ -183,7 +180,7 @@ public class FileBackedOutputStreamTest {
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 20) {
             System.gc();
-            if (findTempFileName() == null) {
+            if (findTempFileName(TEMP_DIR) == null) {
                 return;
             }
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@@ -192,23 +189,30 @@ public class FileBackedOutputStreamTest {
         fail("Temp file was not deleted");
     }
 
-    private static String findTempFileName() {
-        String[] files = new File(TEMP_DIR).list();
+    static String findTempFileName(String dirPath) {
+        String[] files = new File(dirPath).list();
         assertNotNull(files);
         assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2);
         return files.length == 1 ? files[0] : null;
     }
 
-    private static boolean deleteFile(String file) {
+    static boolean deleteFile(String file) {
         return new File(file).delete();
     }
 
-    private static void deleteTempFiles() {
-        String[] files = new File(TEMP_DIR).list();
+    static void deleteTempFiles(String path) {
+        String[] files = new File(path).list();
         if (files != null) {
             for (String file: files) {
-                deleteFile(TEMP_DIR + File.separator + file);
+                deleteFile(path + File.separator + file);
             }
         }
     }
+
+    static void createDir(String path) {
+        File dir = new File(path);
+        if (!dir.exists() && !dir.mkdirs()) {
+            throw new RuntimeException("Failed to create temp dir " + path);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java
new file mode 100644 (file)
index 0000000..0d6f581
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for SharedFileBackedOutputStream.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStreamTest {
+    private static final Logger LOG = LoggerFactory.getLogger(SharedFileBackedOutputStreamTest.class);
+    private static final String TEMP_DIR = "target/FileBackedOutputStreamTest";
+
+    @BeforeClass
+    public static void staticSetup() {
+        FileBackedOutputStreamTest.createDir(TEMP_DIR);
+    }
+
+    @AfterClass
+    public static void staticCleanup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+        FileBackedOutputStreamTest.deleteFile(TEMP_DIR);
+    }
+
+    @Before
+    public void setup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+        FileBackedOutputStream.REFERENCE_CACHE.clear();
+    }
+
+    @After
+    public void cleanup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+    }
+
+    @Test
+    public void testSingleUsage() throws IOException {
+        LOG.info("testSingleUsage starting");
+        try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+            byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+            fbos.write(bytes);
+
+            assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+            fbos.cleanup();
+            assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+        }
+
+        LOG.info("testSingleUsage ending");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSharing() throws IOException {
+        LOG.info("testSharing starting");
+        try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+            String context = "context";
+            Consumer<String> mockCallback = Mockito.mock(Consumer.class);
+            fbos.setOnCleanupCallback(mockCallback , context);
+
+            byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+            fbos.write(bytes);
+
+            assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.incrementUsageCount();
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.incrementUsageCount();
+            fbos.incrementUsageCount();
+
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            verify(mockCallback, never()).accept(context);
+
+            fbos.cleanup();
+            assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            verify(mockCallback).accept(context);
+        }
+
+        LOG.info("testSharing ending");
+    }
+}