Bug 2890: Chunk AppendEntries when single payload size exceeds threshold 01/57301/12
authorTom Pantelis <tompantelis@gmail.com>
Thu, 18 May 2017 13:23:04 +0000 (09:23 -0400)
committerRobert Varga <nite@hq.sk>
Thu, 29 Jun 2017 15:48:09 +0000 (15:48 +0000)
Utilizes the MessageSlicer in AbstractLeader to slice/chunk AppendEntries
messages whose single log entry payloas exceeds the max size threshold.
The MessageAssembler is used in the Follower to re-assemble.

For efficiency, with multiple followers, the AbstractLeader reuses the
FileBackedOutputStream containing the serialized AppendEntries data.
However, since the MessageSlicer takes ownership of the FileBackedOutputStream
and cleans it up when slicing is complete, I added a
SharedFileBackedOutputStream class that maintains a usage count and
performs cleanup when the usage count reaches 0. The AbstractLeader maintains
a Map of SharedFileBackedOutputStream instances keyed by log index.

The FollowerLogInformation keeps track of whether or not slicing is in
progress for the follower. Same as with install snapshot, we only want to send
empty AppendEntries as heartbeats.

Change-Id: Id163944b9989f6cb39a6aaaa98d1f3c4b0026bbe
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java [new file with mode: 0644]

index 123d206d05b9e20742a3c45aba6b2f5bf692d09d..6f9efbbfaca4b2f5a79cca2e7515980b6f2437a5 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotS
  * The state of the followers log as known by the Leader.
  */
 public interface FollowerLogInformation {
+    long NO_INDEX = -1;
 
     /**
      * Increments the value of the follower's next index.
@@ -160,4 +161,19 @@ public interface FollowerLogInformation {
      * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
      */
     void clearLeaderInstallSnapshotState();
+
+    /**
+     * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
+     * needs to be sliced into smaller chunks.
+     *
+     * @param index the log entry index or NO_INDEX to clear it
+     */
+    void setSlicedLogEntryIndex(long index);
+
+    /**
+     * Return whether or not log entry slicing is currently in progress.
+     *
+     * @return true if slicing is currently in progress, false otherwise
+     */
+    boolean isLogEntrySlicingInProgress();
 }
index 883bfbb4e4184f0cbf0843002dfa3498ca13f622..8bd0cf3d34f4639a1e053cc6c17158e469a4422f 100644 (file)
@@ -46,6 +46,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private LeaderInstallSnapshotState installSnapshotState;
 
+    private long slicedLogEntryIndex = NO_INDEX;
+
     /**
      * Constructs an instance.
      *
@@ -92,6 +94,12 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean setMatchIndex(long matchIndex) {
+        // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
+        // and the follower received the entry and responded so clear the slicedLogEntryIndex
+        if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
+            slicedLogEntryIndex = NO_INDEX;
+        }
+
         if (this.matchIndex != matchIndex) {
             this.matchIndex = matchIndex;
             return true;
@@ -210,6 +218,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         installSnapshotState = null;
     }
 
+    @Override
+    public void setSlicedLogEntryIndex(long index) {
+        slicedLogEntryIndex  = index;
+    }
+
+    @Override
+    public boolean isLogEntrySlicingInProgress() {
+        return slicedLogEntryIndex != NO_INDEX;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 0fe6cf1e2fecbfd1529a85bc50a1d769fdcd6d9d..aed050c3c04ee4e15afa9bb907d9887d556bb33f 100644 (file)
@@ -21,7 +21,7 @@ import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -329,12 +329,12 @@ public interface RaftActorContext {
     Consumer<ApplyState> getApplyStateConsumer();
 
     /**
-     * Creates a FileBackedOutputStream with a common configuration.
+     * Returns the {@link FileBackedOutputStreamFactory} instance with a common configuration.
      *
-     * @return a FileBackedOutputStream instance
+     * @return the {@link FileBackedOutputStreamFactory};
      */
     @Nonnull
-    FileBackedOutputStream newFileBackedOutputStream();
+    FileBackedOutputStreamFactory getFileBackedOutputStreamFactory();
 
     /**
      * Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress.
index 5b130db21046bb6d5d7c28f30cdaa57c9303f969..4fd5666ae207a77143f5fc3808c04d8e4417db34 100644 (file)
@@ -29,7 +29,7 @@ import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -89,6 +89,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Consumer<ApplyState> applyStateConsumer;
 
+    private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
+
     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
 
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
@@ -107,6 +109,9 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.log = Preconditions.checkNotNull(logger);
         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
 
+        fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
+                configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
+
         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
@@ -404,9 +409,8 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    public FileBackedOutputStream newFileBackedOutputStream() {
-        return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
-                configParams.getTempFileDirectory());
+    public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
+        return fileBackedOutputStreamFactory;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
index 71ef8ffcea04360b6db573de04d5344d44b3cf7c..7ff47f93d93e0ede5a1e47608aa80be57410f099 100644 (file)
@@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState {
 
             OutputStream installSnapshotStream = null;
             if (targetFollower != null) {
-                installSnapshotStream = context.newFileBackedOutputStream();
+                installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
index 097f0ec677ea52355d99a1256c3a219ccf522869..9fc7b3393cbe1f86404e2eb9a08a9ca6f6e21bc1 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -85,6 +90,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    /**
+     * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+     * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+     * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+     */
+    private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+    private final MessageSlicer appendEntriesMessageSlicer;
+
     private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
@@ -93,6 +106,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
+        appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+            .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+            .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+                    TimeUnit.MILLISECONDS).build();
+
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
             snapshotHolder = initializeFromLeader.snapshotHolder;
@@ -438,6 +456,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        if (appendEntriesMessageSlicer.handleMessage(message)) {
+            return this;
+        }
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -644,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
                 }
+            } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+                sendAppendEntries = sendHeartbeat;
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
@@ -660,11 +684,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     if (followerLogInformation.okToReplicate()) {
-                        // Try to send all the entries in the journal but not exceeding the max data size
-                        // for a single AppendEntries message.
-                        int maxEntries = (int) context.getReplicatedLog().size();
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
-                                context.getConfigParams().getSnapshotChunkSize());
+                        entries = getEntriesToSend(followerLogInformation, followerActor);
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0
@@ -705,6 +725,76 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    private List<ReplicatedLogEntry> getEntriesToSend(FollowerLogInformation followerLogInfo,
+            ActorSelection followerActor) {
+        // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+        // message.
+        int maxEntries = (int) context.getReplicatedLog().size();
+        final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+        final long followerNextIndex = followerLogInfo.getNextIndex();
+        List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+                maxEntries, maxDataSize);
+
+        // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+        // that is the case, then we need to slice it into smaller chunks.
+        if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+            // Don't need to slice.
+            return entries;
+        }
+
+        log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+                maxDataSize);
+
+        // If an AppendEntries has already been serialized for the log index then reuse the
+        // SharedFileBackedOutputStream.
+        final Long logIndex = entries.get(0).getIndex();
+        SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+        if (fileBackedStream == null) {
+            fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+            final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+                    getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+                    context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+            log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+                    followerLogInfo.getId());
+
+            try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+                out.writeObject(appendEntries);
+            } catch (IOException e) {
+                log.error("{}: Error serializing {}", logName(), appendEntries, e);
+                fileBackedStream.cleanup();
+                return Collections.emptyList();
+            }
+
+            sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+            fileBackedStream.setOnCleanupCallback(index -> {
+                log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+                sharedSerializedAppendEntriesStreams.remove(index);
+            }, logIndex);
+        } else {
+            log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+            fileBackedStream.incrementUsageCount();
+        }
+
+        log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+        // Record that slicing is in progress for the follower.
+        followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+        final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+        appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+                .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+                .onFailureCallback(failure -> {
+                    log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+                            followerLogInfo.getId(), failure);
+                    followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+                }).build());
+
+        return Collections.emptyList();
+    }
+
     private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
             FollowerLogInformation followerLogInformation) {
         // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
@@ -714,9 +804,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         //       empty AppendEntries as a heart beat to prevent election.
         //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
         //       need to send AppendEntries to prevent election.
+        //     - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+        //       need to send an empty AppendEntries to prevent election.
         boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
-        long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
-            context.getCommitIndex();
+        long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+                || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
 
         long followerNextIndex = followerLogInformation.getNextIndex();
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
@@ -858,6 +950,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());
             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+
+            appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
         }
     }
 
@@ -889,6 +983,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     @Override
     public void close() {
         stopHeartBeat();
+        appendEntriesMessageSlicer.close();
     }
 
     @Override
index 15877194173cf330d85f221688ab890366033be4..9f0e0327ce16e109c33c28e0b7c0147aabe69194 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -56,6 +57,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
+    private final MessageAssembler appendEntriesMessageAssembler;
+
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -74,6 +77,10 @@ public class Follower extends AbstractRaftActorBehavior {
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
             .getSyncIndexThreshold());
 
+        appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
+                .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+
         if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
             actor().tell(TimeoutNow.INSTANCE, actor());
         } else {
@@ -315,6 +322,8 @@ public class Follower extends AbstractRaftActorBehavior {
             super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
+        appendEntriesMessageAssembler.checkExpiredAssembledMessageState();
+
         return this;
     }
 
@@ -396,6 +405,10 @@ public class Follower extends AbstractRaftActorBehavior {
             return handleElectionTimeout(message);
         }
 
+        if (appendEntriesMessageAssembler.handleMessage(message, actor())) {
+            return this;
+        }
+
         if (!(message instanceof RaftRPC)) {
             // The rest of the processing requires the message to be a RaftRPC
             return null;
@@ -596,6 +609,7 @@ public class Follower extends AbstractRaftActorBehavior {
     public void close() {
         closeSnapshotTracker();
         stopElection();
+        appendEntriesMessageAssembler.close();
     }
 
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java
new file mode 100644 (file)
index 0000000..32c6da4
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
+
+/**
+ * An Identifier for a follower.
+ *
+ * @author Thomas Pantelis
+ */
+class FollowerIdentifier extends AbstractStringIdentifier<FollowerIdentifier> {
+    private static final long serialVersionUID = 1L;
+
+    FollowerIdentifier(String followerId) {
+        super(followerId);
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private FollowerIdentifier identifier;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(FollowerIdentifier identifier) {
+            this.identifier = identifier;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(identifier.getValue());
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            identifier = new FollowerIdentifier((String) in.readObject());
+        }
+
+        private Object readResolve() {
+            return identifier;
+        }
+    }
+}
index 6b7e037a339783fb9f2f163648b6cee2f72f1790..f8969fcfc14892f4fbf000bbe00af4a17a8f07d3 100644 (file)
@@ -36,7 +36,7 @@ class SnapshotTracker implements AutoCloseable {
         this.log = log;
         this.totalChunks = totalChunks;
         this.leaderId = Preconditions.checkNotNull(leaderId);
-        fileBackedStream = context.newFileBackedOutputStream();
+        fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance();
         bufferedStream = new BufferedOutputStream(fileBackedStream);
     }
 
index 82c049d7cfd54ccae50fa1ad9048c02b550ba8ac..e9fc53afbe088d81dbdf7b4c6679453dcc2950f9 100644 (file)
@@ -107,6 +107,29 @@ public final class ServerConfigurationPayload extends Payload implements Persist
         return serializedSize;
     }
 
+    @Override
+    public int hashCode() {
+        return serverConfig.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        ServerConfigurationPayload other = (ServerConfigurationPayload) obj;
+        return serverConfig.equals(other.serverConfig);
+    }
+
     @Override
     public String toString() {
         return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]";
index 5959df768fc6bd3359edfef777331b8bbcc9bb24..c50a10894a6ff5600150bfbda04c0dfc22ddb01a 100644 (file)
@@ -216,6 +216,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long currentTerm;
 
     protected int snapshotBatchCount = 4;
+    protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
 
     protected List<MockPayload> expSnapshotState = new ArrayList<>();
 
@@ -233,7 +234,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
-        configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
+        configParams.setSnapshotChunkSize(snapshotChunkSize);
         return configParams;
     }
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java
new file mode 100644 (file)
index 0000000..ea6de41
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+
+/**
+ * Tests end-to-end replication of sliced log entry payloads, ie entries whose size exceeds the maximum size for a
+ * single AppendEntries message.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicationWithSlicedPayloadIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    @Test
+    public void runTest() throws Exception {
+        testLog.info("ReplicationWithSlicedPayloadIntegrationTest starting");
+
+        // Create the leader and 2 follower actors.
+
+        snapshotChunkSize = 20;
+
+        DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+        followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower2Id, testActorPath(follower2Id)), followerConfigParams);
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+        peerAddresses = ImmutableMap.<String, String>builder()
+                .put(follower1Id, follower1Actor.path().toString())
+                .put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+        waitUntilLeader(leaderActor);
+
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        // Send a large payload that exceeds the size threshold and needs to be sliced.
+
+        MockPayload largePayload = sendPayloadData(leaderActor, "large", snapshotChunkSize + 1);
+
+        // Then send a small payload that does not need to be sliced.
+
+        MockPayload smallPayload = sendPayloadData(leaderActor, "normal", snapshotChunkSize - 1);
+
+        final List<ApplyState> leaderApplyState = expectMatching(leaderCollectorActor, ApplyState.class, 2);
+        verifyApplyState(leaderApplyState.get(0), leaderCollectorActor,
+                largePayload.toString(), currentTerm, 0, largePayload);
+        verifyApplyState(leaderApplyState.get(1), leaderCollectorActor,
+                smallPayload.toString(), currentTerm, 1, smallPayload);
+
+        final List<ApplyState> follower1ApplyState = expectMatching(follower1CollectorActor, ApplyState.class, 2);
+        verifyApplyState(follower1ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+        verifyApplyState(follower1ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+        final List<ApplyState> follower2ApplyState = expectMatching(follower2CollectorActor, ApplyState.class, 2);
+        verifyApplyState(follower2ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+        verifyApplyState(follower2ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+        testLog.info("ReplicationWithSlicedPayloadIntegrationTest ending");
+    }
+}
index 3f96485351c7bca8a11ee3b4664f5d921370ae3f..fdfaa046fd5d2870ccb582190a16f66c76cfb5df 100644 (file)
@@ -38,7 +38,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -97,7 +97,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
         doReturn("member5").when(mockElectionTerm).getVotedFor();
 
-        doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
+        doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+                .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
 
         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
         factory = new TestActorFactory(getSystem());
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java
new file mode 100644 (file)
index 0000000..8f0b613
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for FollowerIdentifier.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerIdentifierTest {
+
+    @Test
+    public void testSerialization() throws FileNotFoundException, IOException {
+        FollowerIdentifier expected = new FollowerIdentifier("follower1");
+        FollowerIdentifier cloned = (FollowerIdentifier) SerializationUtils.clone(expected);
+        assertEquals("cloned", expected, cloned);
+    }
+}
index a8ecf71c47943c023cdbd64f0a63462b4084ef09..930c1968ace2465936ff2c42b37869f7742012b6 100644 (file)
@@ -39,8 +39,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -70,6 +73,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -160,7 +164,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
@@ -2230,6 +2237,145 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
     }
 
+    @Test
+    public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+        logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+        final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+                Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+                        new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+        final MockRaftActorContext.MockPayload largePayload =
+                new MockRaftActorContext.MockPayload("large", serializedSize);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(300, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send normal payload first to prime commit index.
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        sendReplicate(leaderActorContext, term, 0);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+        assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+        sendReplicate(leaderActorContext, term, 1, largePayload);
+
+        MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+        assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+        final Identifier slicingId = messageSlice.getIdentifier();
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+        messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+        assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+        leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send another normal payload.
+
+        sendReplicate(leaderActorContext, term, 2);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+    }
+
+    @Test
+    public void testLargePayloadSlicingExpiration() {
+        logStart("testLargePayloadSlicingExpiration");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Send initial heartbeat reply so follower is marked active
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+                leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+        // Sleep for at least 3 * election timeout so the slicing state expires.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+        MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send an AppendEntriesReply - this should restart the slicing.
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
index f52d8e4e9d3b5d2f6e4a2812268610915abca723..658e2ca53fb4843cdf4f9e4c5a4b9a1b6512b1b2 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -27,6 +28,7 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +60,9 @@ public class SnapshotTrackerTest {
         chunk3 = getNextChunk(byteString, 20, byteString.size());
 
         fbos = spy(new FileBackedOutputStream(100000000, "target"));
-        doReturn(fbos).when(mockContext).newFileBackedOutputStream();
+        FileBackedOutputStreamFactory mockFactory = mock(FileBackedOutputStreamFactory.class);
+        doReturn(fbos).when(mockFactory).newInstance();
+        doReturn(mockFactory).when(mockContext).getFileBackedOutputStreamFactory();
     }
 
     @Test
index 0cd4be67a50f1e3c2fc01f3b26e3a7f2bf0d97e2..75948830be824b081b35922c6cdeb89aaccd9e3a 100644 (file)
@@ -39,4 +39,13 @@ public class FileBackedOutputStreamFactory {
     public FileBackedOutputStream newInstance() {
         return new FileBackedOutputStream(fileThreshold, fileDirectory);
     }
+
+    /**
+     * Creates a new {@link SharedFileBackedOutputStream} with the settings configured for this factory.
+     *
+     * @return a {@link SharedFileBackedOutputStream} instance
+     */
+    public SharedFileBackedOutputStream newSharedInstance() {
+        return new SharedFileBackedOutputStream(fileThreshold, fileDirectory);
+    }
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java
new file mode 100644 (file)
index 0000000..852473b
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * A FileBackedOutputStream that allows for sharing in that it maintains a usage count and the backing file isn't
+ * deleted until the usage count reaches 0. The usage count is initialized to 1 on construction. Subsequent users of
+ * the instance must call {@link #incrementUsageCount()}. The {@link #cleanup()} method decrements the usage count and,
+ * when it reaches 0, the {@link FileBackedOutputStream#cleanup()} is called to delete the backing file.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStream extends FileBackedOutputStream {
+    private final AtomicInteger usageCount = new AtomicInteger(1);
+    @SuppressWarnings("rawtypes")
+    private Consumer onCleanupCallback;
+    private Object onCleanupContext;
+
+    public SharedFileBackedOutputStream(int fileThreshold, String fileDirectory) {
+        super(fileThreshold, fileDirectory);
+    }
+
+    /**
+     * Increments the usage count. This must be followed by a corresponding call to {@link #cleanup()} when this
+     * instance is no longer needed.
+     */
+    public void incrementUsageCount() {
+        usageCount.getAndIncrement();
+    }
+
+    /**
+     * Returns the current usage count.
+     *
+     * @return the current usage count
+     */
+    public int getUsageCount() {
+        return usageCount.get();
+    }
+
+    /**
+     * Sets the callback to be notified when {@link FileBackedOutputStream#cleanup()} is called to delete the backing
+     * file.
+     */
+    public <T> void setOnCleanupCallback(Consumer<T> callback, T context) {
+        onCleanupCallback = callback;
+        onCleanupContext = context;
+    }
+
+    /**
+     * Overridden to decrement the usage count.
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cleanup() {
+        Preconditions.checkState(usageCount.get() > 0);
+
+        if (usageCount.decrementAndGet() == 0) {
+            super.cleanup();
+
+            if (onCleanupCallback != null) {
+                onCleanupCallback.accept(onCleanupContext);
+            }
+        }
+    }
+}
index bcb1850d8c30cef9b204a9a3666f7fb51c5ba1bf..a7abf7ca9740942285d8f14f13009ebc8ce3b58a 100644 (file)
@@ -40,27 +40,24 @@ public class FileBackedOutputStreamTest {
 
     @BeforeClass
     public static void staticSetup() {
-        File dir = new File(TEMP_DIR);
-        if (!dir.exists() && !dir.mkdirs()) {
-            throw new RuntimeException("Failed to create temp dir " + TEMP_DIR);
-        }
+        createDir(TEMP_DIR);
     }
 
     @AfterClass
     public static void staticCleanup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
         deleteFile(TEMP_DIR);
     }
 
     @Before
     public void setup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
         FileBackedOutputStream.REFERENCE_CACHE.clear();
     }
 
     @After
     public void cleanup() {
-        deleteTempFiles();
+        deleteTempFiles(TEMP_DIR);
     }
 
     @Test
@@ -72,7 +69,7 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes, 1, bytes.length - 1);
 
             assertEquals("getCount", bytes.length, fbos.getCount());
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             // Read bytes twice.
@@ -95,13 +92,13 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes[0]);
             fbos.write(bytes, 1, 11);
 
-            String tempFileName = findTempFileName();
+            String tempFileName = findTempFileName(TEMP_DIR);
             assertNotNull("Expected temp file created", tempFileName);
 
             fbos.write(bytes[12]);
             fbos.write(bytes, 13, bytes.length - 13);
 
-            assertEquals("Temp file", tempFileName, findTempFileName());
+            assertEquals("Temp file", tempFileName, findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             InputStream inputStream = fbos.asByteSource().openStream();
@@ -121,7 +118,7 @@ public class FileBackedOutputStreamTest {
 
             assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
         }
 
         LOG.info("testFileThresholdReachedWithWriteBytes ending");
@@ -135,12 +132,12 @@ public class FileBackedOutputStreamTest {
             fbos.write(bytes[0]);
             fbos.write(bytes[1]);
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
 
             fbos.write(bytes[2]);
             fbos.flush();
 
-            assertNotNull("Expected temp file created", findTempFileName());
+            assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
 
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
             assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
@@ -156,7 +153,7 @@ public class FileBackedOutputStreamTest {
             byte[] bytes = new byte[]{0, 1, 2};
             fbos.write(bytes);
 
-            assertNull("Found unexpected temp file", findTempFileName());
+            assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
             assertEquals("Size", bytes.length, fbos.asByteSource().size());
 
             // Should throw IOException after call to asByteSource.
@@ -172,7 +169,7 @@ public class FileBackedOutputStreamTest {
         try {
             fbos = new FileBackedOutputStream(1, TEMP_DIR);
             fbos.write(new byte[] {0, 1});
-            assertNotNull("Expected temp file created", findTempFileName());
+            assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
         } finally {
             if (fbos != null) {
                 fbos.close();
@@ -183,7 +180,7 @@ public class FileBackedOutputStreamTest {
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 20) {
             System.gc();
-            if (findTempFileName() == null) {
+            if (findTempFileName(TEMP_DIR) == null) {
                 return;
             }
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@@ -192,23 +189,30 @@ public class FileBackedOutputStreamTest {
         fail("Temp file was not deleted");
     }
 
-    private static String findTempFileName() {
-        String[] files = new File(TEMP_DIR).list();
+    static String findTempFileName(String dirPath) {
+        String[] files = new File(dirPath).list();
         assertNotNull(files);
         assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2);
         return files.length == 1 ? files[0] : null;
     }
 
-    private static boolean deleteFile(String file) {
+    static boolean deleteFile(String file) {
         return new File(file).delete();
     }
 
-    private static void deleteTempFiles() {
-        String[] files = new File(TEMP_DIR).list();
+    static void deleteTempFiles(String path) {
+        String[] files = new File(path).list();
         if (files != null) {
             for (String file: files) {
-                deleteFile(TEMP_DIR + File.separator + file);
+                deleteFile(path + File.separator + file);
             }
         }
     }
+
+    static void createDir(String path) {
+        File dir = new File(path);
+        if (!dir.exists() && !dir.mkdirs()) {
+            throw new RuntimeException("Failed to create temp dir " + path);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java
new file mode 100644 (file)
index 0000000..0d6f581
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for SharedFileBackedOutputStream.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStreamTest {
+    private static final Logger LOG = LoggerFactory.getLogger(SharedFileBackedOutputStreamTest.class);
+    private static final String TEMP_DIR = "target/FileBackedOutputStreamTest";
+
+    @BeforeClass
+    public static void staticSetup() {
+        FileBackedOutputStreamTest.createDir(TEMP_DIR);
+    }
+
+    @AfterClass
+    public static void staticCleanup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+        FileBackedOutputStreamTest.deleteFile(TEMP_DIR);
+    }
+
+    @Before
+    public void setup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+        FileBackedOutputStream.REFERENCE_CACHE.clear();
+    }
+
+    @After
+    public void cleanup() {
+        FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+    }
+
+    @Test
+    public void testSingleUsage() throws IOException {
+        LOG.info("testSingleUsage starting");
+        try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+            byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+            fbos.write(bytes);
+
+            assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+            fbos.cleanup();
+            assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+        }
+
+        LOG.info("testSingleUsage ending");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSharing() throws IOException {
+        LOG.info("testSharing starting");
+        try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+            String context = "context";
+            Consumer<String> mockCallback = Mockito.mock(Consumer.class);
+            fbos.setOnCleanupCallback(mockCallback , context);
+
+            byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+            fbos.write(bytes);
+
+            assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.incrementUsageCount();
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.incrementUsageCount();
+            fbos.incrementUsageCount();
+
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            fbos.cleanup();
+            assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            verify(mockCallback, never()).accept(context);
+
+            fbos.cleanup();
+            assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+            verify(mockCallback).accept(context);
+        }
+
+        LOG.info("testSharing ending");
+    }
+}