From: Tom Pantelis Date: Thu, 18 May 2017 13:23:04 +0000 (-0400) Subject: Bug 2890: Chunk AppendEntries when single payload size exceeds threshold X-Git-Tag: release/nitrogen~78 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8d90cf04be86f872f7eeb892d37517d5ee087157 Bug 2890: Chunk AppendEntries when single payload size exceeds threshold Utilizes the MessageSlicer in AbstractLeader to slice/chunk AppendEntries messages whose single log entry payloas exceeds the max size threshold. The MessageAssembler is used in the Follower to re-assemble. For efficiency, with multiple followers, the AbstractLeader reuses the FileBackedOutputStream containing the serialized AppendEntries data. However, since the MessageSlicer takes ownership of the FileBackedOutputStream and cleans it up when slicing is complete, I added a SharedFileBackedOutputStream class that maintains a usage count and performs cleanup when the usage count reaches 0. The AbstractLeader maintains a Map of SharedFileBackedOutputStream instances keyed by log index. The FollowerLogInformation keeps track of whether or not slicing is in progress for the follower. Same as with install snapshot, we only want to send empty AppendEntries as heartbeats. Change-Id: Id163944b9989f6cb39a6aaaa98d1f3c4b0026bbe Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 123d206d05..6f9efbbfac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -16,6 +16,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotS * The state of the followers log as known by the Leader. */ public interface FollowerLogInformation { + long NO_INDEX = -1; /** * Increments the value of the follower's next index. @@ -160,4 +161,19 @@ public interface FollowerLogInformation { * Clears the LeaderInstallSnapshotState when an install snapshot is complete. */ void clearLeaderInstallSnapshotState(); + + /** + * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus + * needs to be sliced into smaller chunks. + * + * @param index the log entry index or NO_INDEX to clear it + */ + void setSlicedLogEntryIndex(long index); + + /** + * Return whether or not log entry slicing is currently in progress. + * + * @return true if slicing is currently in progress, false otherwise + */ + boolean isLogEntrySlicingInProgress(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 883bfbb4e4..8bd0cf3d34 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -46,6 +46,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private LeaderInstallSnapshotState installSnapshotState; + private long slicedLogEntryIndex = NO_INDEX; + /** * Constructs an instance. * @@ -92,6 +94,12 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean setMatchIndex(long matchIndex) { + // If the new match index is the index of the entry currently being sliced, then we know slicing is complete + // and the follower received the entry and responded so clear the slicedLogEntryIndex + if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { + slicedLogEntryIndex = NO_INDEX; + } + if (this.matchIndex != matchIndex) { this.matchIndex = matchIndex; return true; @@ -210,6 +218,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { installSnapshotState = null; } + @Override + public void setSlicedLogEntryIndex(long index) { + slicedLogEntryIndex = index; + } + + @Override + public boolean isLogEntrySlicingInProgress() { + return slicedLogEntryIndex != NO_INDEX; + } + @Override public String toString() { return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0fe6cf1e2f..aed050c3c0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -21,7 +21,7 @@ import java.util.function.LongSupplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -329,12 +329,12 @@ public interface RaftActorContext { Consumer getApplyStateConsumer(); /** - * Creates a FileBackedOutputStream with a common configuration. + * Returns the {@link FileBackedOutputStreamFactory} instance with a common configuration. * - * @return a FileBackedOutputStream instance + * @return the {@link FileBackedOutputStreamFactory}; */ @Nonnull - FileBackedOutputStream newFileBackedOutputStream(); + FileBackedOutputStreamFactory getFileBackedOutputStreamFactory(); /** * Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 5b130db210..4fd5666ae2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -29,7 +29,7 @@ import java.util.function.LongSupplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -89,6 +89,8 @@ public class RaftActorContextImpl implements RaftActorContext { private final Consumer applyStateConsumer; + private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory; + private RaftActorLeadershipTransferCohort leadershipTransferCohort; public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, @@ -107,6 +109,9 @@ public class RaftActorContextImpl implements RaftActorContext { this.log = Preconditions.checkNotNull(logger); this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer); + fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory( + configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory()); + for (Map.Entry e: Preconditions.checkNotNull(peerAddresses).entrySet()) { peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING)); } @@ -404,9 +409,8 @@ public class RaftActorContextImpl implements RaftActorContext { } @Override - public FileBackedOutputStream newFileBackedOutputStream() { - return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(), - configParams.getTempFileDirectory()); + public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() { + return fileBackedOutputStreamFactory; } @SuppressWarnings("checkstyle:IllegalCatch") diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 71ef8ffcea..7ff47f93d9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState { OutputStream installSnapshotStream = null; if (targetFollower != null) { - installSnapshotStream = context.newFileBackedOutputStream(); + installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance(); log.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 097f0ec677..9fc7b3393c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,7 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -85,6 +90,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ private final Queue trackers = new LinkedList<>(); + /** + * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the + * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold. + * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers. + */ + private final Map sharedSerializedAppendEntriesStreams = new HashMap<>(); + private final MessageSlicer appendEntriesMessageSlicer; + private Cancellable heartbeatSchedule = null; private Optional snapshotHolder = Optional.absent(); private int minReplicationCount; @@ -93,6 +106,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Nullable AbstractLeader initializeFromLeader) { super(context, state); + appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName()) + .messageSliceSize(context.getConfigParams().getSnapshotChunkSize()) + .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3, + TimeUnit.MILLISECONDS).build(); + if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); snapshotHolder = initializeFromLeader.snapshotHolder; @@ -438,6 +456,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public RaftActorBehavior handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); + if (appendEntriesMessageSlicer.handleMessage(message)) { + return this; + } + if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -644,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } + } else if (followerLogInformation.isLogEntrySlicingInProgress()) { + sendAppendEntries = sendHeartbeat; } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); @@ -660,11 +684,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, followerId); if (followerLogInformation.okToReplicate()) { - // Try to send all the entries in the journal but not exceeding the max data size - // for a single AppendEntries message. - int maxEntries = (int) context.getReplicatedLog().size(); - entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, - context.getConfigParams().getSnapshotChunkSize()); + entries = getEntriesToSend(followerLogInformation, followerActor); sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 @@ -705,6 +725,76 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + private List getEntriesToSend(FollowerLogInformation followerLogInfo, + ActorSelection followerActor) { + // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries + // message. + int maxEntries = (int) context.getReplicatedLog().size(); + final int maxDataSize = context.getConfigParams().getSnapshotChunkSize(); + final long followerNextIndex = followerLogInfo.getNextIndex(); + List entries = context.getReplicatedLog().getFrom(followerNextIndex, + maxEntries, maxDataSize); + + // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If + // that is the case, then we need to slice it into smaller chunks. + if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) { + // Don't need to slice. + return entries; + } + + log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(), + maxDataSize); + + // If an AppendEntries has already been serialized for the log index then reuse the + // SharedFileBackedOutputStream. + final Long logIndex = entries.get(0).getIndex(); + SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex); + if (fileBackedStream == null) { + fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance(); + + final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries, + context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion()); + + log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries, + followerLogInfo.getId()); + + try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { + out.writeObject(appendEntries); + } catch (IOException e) { + log.error("{}: Error serializing {}", logName(), appendEntries, e); + fileBackedStream.cleanup(); + return Collections.emptyList(); + } + + sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream); + + fileBackedStream.setOnCleanupCallback(index -> { + log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index); + sharedSerializedAppendEntriesStreams.remove(index); + }, logIndex); + } else { + log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId()); + fileBackedStream.incrementUsageCount(); + } + + log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId()); + + // Record that slicing is in progress for the follower. + followerLogInfo.setSlicedLogEntryIndex(logIndex); + + final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId()); + appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier) + .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor()) + .onFailureCallback(failure -> { + log.error("{}: Error slicing AppendEntries for follower {}", logName(), + followerLogInfo.getId(), failure); + followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX); + }).build()); + + return Collections.emptyList(); + } + private void sendAppendEntriesToFollower(ActorSelection followerActor, List entries, FollowerLogInformation followerLogInformation) { // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from @@ -714,9 +804,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // empty AppendEntries as a heart beat to prevent election. // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still // need to send AppendEntries to prevent election. + // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we + // need to send an empty AppendEntries to prevent election. boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null; - long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 : - context.getCommitIndex(); + long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress() + || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex(); long followerNextIndex = followerLogInformation.getNextIndex(); AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), @@ -858,6 +950,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!followerToLog.isEmpty()) { log.trace("{}: Sending heartbeat", logName()); sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); + + appendEntriesMessageSlicer.checkExpiredSlicedMessageState(); } } @@ -889,6 +983,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Override public void close() { stopHeartBeat(); + appendEntriesMessageSlicer.close(); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 1587719417..9f0e0327ce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -56,6 +57,8 @@ public class Follower extends AbstractRaftActorBehavior { private final SyncStatusTracker initialSyncStatusTracker; + private final MessageAssembler appendEntriesMessageAssembler; + private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted(); private SnapshotTracker snapshotTracker = null; private String leaderId; @@ -74,6 +77,10 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams() .getSyncIndexThreshold()); + appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName()) + .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory()) + .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build(); + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { actor().tell(TimeoutNow.INSTANCE, actor()); } else { @@ -315,6 +322,8 @@ public class Follower extends AbstractRaftActorBehavior { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } + appendEntriesMessageAssembler.checkExpiredAssembledMessageState(); + return this; } @@ -396,6 +405,10 @@ public class Follower extends AbstractRaftActorBehavior { return handleElectionTimeout(message); } + if (appendEntriesMessageAssembler.handleMessage(message, actor())) { + return this; + } + if (!(message instanceof RaftRPC)) { // The rest of the processing requires the message to be a RaftRPC return null; @@ -596,6 +609,7 @@ public class Follower extends AbstractRaftActorBehavior { public void close() { closeSnapshotTracker(); stopElection(); + appendEntriesMessageAssembler.close(); } @VisibleForTesting diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java new file mode 100644 index 0000000000..32c6da4b52 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifier.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.behaviors; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.yangtools.util.AbstractStringIdentifier; + +/** + * An Identifier for a follower. + * + * @author Thomas Pantelis + */ +class FollowerIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; + + FollowerIdentifier(String followerId) { + super(followerId); + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private FollowerIdentifier identifier; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(FollowerIdentifier identifier) { + this.identifier = identifier; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(identifier.getValue()); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + identifier = new FollowerIdentifier((String) in.readObject()); + } + + private Object readResolve() { + return identifier; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index 6b7e037a33..f8969fcfc1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -36,7 +36,7 @@ class SnapshotTracker implements AutoCloseable { this.log = log; this.totalChunks = totalChunks; this.leaderId = Preconditions.checkNotNull(leaderId); - fileBackedStream = context.newFileBackedOutputStream(); + fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance(); bufferedStream = new BufferedOutputStream(fileBackedStream); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java index 82c049d7cf..e9fc53afbe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java @@ -107,6 +107,29 @@ public final class ServerConfigurationPayload extends Payload implements Persist return serializedSize; } + @Override + public int hashCode() { + return serverConfig.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + ServerConfigurationPayload other = (ServerConfigurationPayload) obj; + return serverConfig.equals(other.serverConfig); + } + @Override public String toString() { return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]"; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 5959df768f..c50a10894a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -216,6 +216,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected long currentTerm; protected int snapshotBatchCount = 4; + protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE; protected List expSnapshotState = new ArrayList<>(); @@ -233,7 +234,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest configParams.setSnapshotBatchCount(snapshotBatchCount); configParams.setSnapshotDataThresholdPercentage(70); configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); - configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE); + configParams.setSnapshotChunkSize(snapshotChunkSize); return configParams; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java new file mode 100644 index 0000000000..ea6de4106a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationWithSlicedPayloadIntegrationTest.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; + +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; + +/** + * Tests end-to-end replication of sliced log entry payloads, ie entries whose size exceeds the maximum size for a + * single AppendEntries message. + * + * @author Thomas Pantelis + */ +public class ReplicationWithSlicedPayloadIntegrationTest extends AbstractRaftActorIntegrationTest { + + @Test + public void runTest() throws Exception { + testLog.info("ReplicationWithSlicedPayloadIntegrationTest starting"); + + // Create the leader and 2 follower actors. + + snapshotChunkSize = 20; + + DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams(); + followerConfigParams.setSnapshotBatchCount(snapshotBatchCount); + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), followerConfigParams); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), followerConfigParams); + + peerAddresses = ImmutableMap.builder() + .put(follower1Id, follower1Actor.path().toString()) + .put(follower2Id, follower2Actor.path().toString()).build(); + + leaderConfigParams = newLeaderConfigParams(); + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); + leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + + waitUntilLeader(leaderActor); + + currentTerm = leaderContext.getTermInformation().getCurrentTerm(); + + // Send a large payload that exceeds the size threshold and needs to be sliced. + + MockPayload largePayload = sendPayloadData(leaderActor, "large", snapshotChunkSize + 1); + + // Then send a small payload that does not need to be sliced. + + MockPayload smallPayload = sendPayloadData(leaderActor, "normal", snapshotChunkSize - 1); + + final List leaderApplyState = expectMatching(leaderCollectorActor, ApplyState.class, 2); + verifyApplyState(leaderApplyState.get(0), leaderCollectorActor, + largePayload.toString(), currentTerm, 0, largePayload); + verifyApplyState(leaderApplyState.get(1), leaderCollectorActor, + smallPayload.toString(), currentTerm, 1, smallPayload); + + final List follower1ApplyState = expectMatching(follower1CollectorActor, ApplyState.class, 2); + verifyApplyState(follower1ApplyState.get(0), null, null, currentTerm, 0, largePayload); + verifyApplyState(follower1ApplyState.get(1), null, null, currentTerm, 1, smallPayload); + + final List follower2ApplyState = expectMatching(follower2CollectorActor, ApplyState.class, 2); + verifyApplyState(follower2ApplyState.get(0), null, null, currentTerm, 0, largePayload); + verifyApplyState(follower2ApplyState.get(1), null, null, currentTerm, 1, smallPayload); + + testLog.info("ReplicationWithSlicedPayloadIntegrationTest ending"); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 3f96485351..fdfaa046fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -38,7 +38,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -97,7 +97,8 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(5L).when(mockElectionTerm).getCurrentTerm(); doReturn("member5").when(mockElectionTerm).getVotedFor(); - doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream(); + doReturn(new FileBackedOutputStreamFactory(10000000, "target")) + .when(mockRaftActorContext).getFileBackedOutputStreamFactory(); snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); factory = new TestActorFactory(getSystem()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java new file mode 100644 index 0000000000..8f0b613664 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerIdentifierTest.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.behaviors; + +import static org.junit.Assert.assertEquals; + +import java.io.FileNotFoundException; +import java.io.IOException; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for FollowerIdentifier. + * + * @author Thomas Pantelis + */ +public class FollowerIdentifierTest { + + @Test + public void testSerialization() throws FileNotFoundException, IOException { + FollowerIdentifier expected = new FollowerIdentifier("follower1"); + FollowerIdentifier cloned = (FollowerIdentifier) SerializationUtils.clone(expected); + assertEquals("cloned", expected, cloned); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index a8ecf71c47..930c1968ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -39,8 +39,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.messaging.MessageSlice; +import org.opendaylight.controller.cluster.messaging.MessageSliceReply; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -70,6 +73,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.yangtools.concepts.Identifier; @@ -160,7 +164,10 @@ public class LeaderTest extends AbstractLeaderTest { } private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); @@ -2230,6 +2237,145 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); } + @Test + public void testReplicationWithPayloadSizeThatExceedsThreshold() { + logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); + + final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, + Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; + final MockRaftActorContext.MockPayload largePayload = + new MockRaftActorContext.MockPayload("large", serializedSize); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(300, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + // Send normal payload first to prime commit index. + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + sendReplicate(leaderActorContext, term, 0); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex()); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0)); + assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex()); + MessageCollectorActor.clearMessages(followerActor); + + // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced. + sendReplicate(leaderActorContext, term, 1, largePayload); + + MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex()); + assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices()); + + final Identifier slicingId = messageSlice.getIdentifier(); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress. + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Simulate the MessageSliceReply's and AppendEntriesReply from the follower. + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor)); + messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex()); + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor)); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + + // Send another normal payload. + + sendReplicate(leaderActorContext, term, 2); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit()); + } + + @Test + public void testLargePayloadSlicingExpiration() { + logStart("testLargePayloadSlicingExpiration"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", + leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + + // Sleep for at least 3 * election timeout so the slicing state expires. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS); + MessageCollectorActor.clearMessages(followerActor); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + + MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300); + MessageCollectorActor.clearMessages(followerActor); + + // Send an AppendEntriesReply - this should restart the slicing. + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0)); + + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index f52d8e4e9d..658e2ca53f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -27,6 +28,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +60,9 @@ public class SnapshotTrackerTest { chunk3 = getNextChunk(byteString, 20, byteString.size()); fbos = spy(new FileBackedOutputStream(100000000, "target")); - doReturn(fbos).when(mockContext).newFileBackedOutputStream(); + FileBackedOutputStreamFactory mockFactory = mock(FileBackedOutputStreamFactory.class); + doReturn(fbos).when(mockFactory).newInstance(); + doReturn(mockFactory).when(mockContext).getFileBackedOutputStreamFactory(); } @Test diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java index 0cd4be67a5..75948830be 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java @@ -39,4 +39,13 @@ public class FileBackedOutputStreamFactory { public FileBackedOutputStream newInstance() { return new FileBackedOutputStream(fileThreshold, fileDirectory); } + + /** + * Creates a new {@link SharedFileBackedOutputStream} with the settings configured for this factory. + * + * @return a {@link SharedFileBackedOutputStream} instance + */ + public SharedFileBackedOutputStream newSharedInstance() { + return new SharedFileBackedOutputStream(fileThreshold, fileDirectory); + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java new file mode 100644 index 0000000000..852473b283 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStream.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * A FileBackedOutputStream that allows for sharing in that it maintains a usage count and the backing file isn't + * deleted until the usage count reaches 0. The usage count is initialized to 1 on construction. Subsequent users of + * the instance must call {@link #incrementUsageCount()}. The {@link #cleanup()} method decrements the usage count and, + * when it reaches 0, the {@link FileBackedOutputStream#cleanup()} is called to delete the backing file. + * + * @author Thomas Pantelis + */ +public class SharedFileBackedOutputStream extends FileBackedOutputStream { + private final AtomicInteger usageCount = new AtomicInteger(1); + @SuppressWarnings("rawtypes") + private Consumer onCleanupCallback; + private Object onCleanupContext; + + public SharedFileBackedOutputStream(int fileThreshold, String fileDirectory) { + super(fileThreshold, fileDirectory); + } + + /** + * Increments the usage count. This must be followed by a corresponding call to {@link #cleanup()} when this + * instance is no longer needed. + */ + public void incrementUsageCount() { + usageCount.getAndIncrement(); + } + + /** + * Returns the current usage count. + * + * @return the current usage count + */ + public int getUsageCount() { + return usageCount.get(); + } + + /** + * Sets the callback to be notified when {@link FileBackedOutputStream#cleanup()} is called to delete the backing + * file. + */ + public void setOnCleanupCallback(Consumer callback, T context) { + onCleanupCallback = callback; + onCleanupContext = context; + } + + /** + * Overridden to decrement the usage count. + */ + @SuppressWarnings("unchecked") + @Override + public void cleanup() { + Preconditions.checkState(usageCount.get() > 0); + + if (usageCount.decrementAndGet() == 0) { + super.cleanup(); + + if (onCleanupCallback != null) { + onCleanupCallback.accept(onCleanupContext); + } + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java index bcb1850d8c..a7abf7ca97 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamTest.java @@ -40,27 +40,24 @@ public class FileBackedOutputStreamTest { @BeforeClass public static void staticSetup() { - File dir = new File(TEMP_DIR); - if (!dir.exists() && !dir.mkdirs()) { - throw new RuntimeException("Failed to create temp dir " + TEMP_DIR); - } + createDir(TEMP_DIR); } @AfterClass public static void staticCleanup() { - deleteTempFiles(); + deleteTempFiles(TEMP_DIR); deleteFile(TEMP_DIR); } @Before public void setup() { - deleteTempFiles(); + deleteTempFiles(TEMP_DIR); FileBackedOutputStream.REFERENCE_CACHE.clear(); } @After public void cleanup() { - deleteTempFiles(); + deleteTempFiles(TEMP_DIR); } @Test @@ -72,7 +69,7 @@ public class FileBackedOutputStreamTest { fbos.write(bytes, 1, bytes.length - 1); assertEquals("getCount", bytes.length, fbos.getCount()); - assertNull("Found unexpected temp file", findTempFileName()); + assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR)); assertEquals("Size", bytes.length, fbos.asByteSource().size()); // Read bytes twice. @@ -95,13 +92,13 @@ public class FileBackedOutputStreamTest { fbos.write(bytes[0]); fbos.write(bytes, 1, 11); - String tempFileName = findTempFileName(); + String tempFileName = findTempFileName(TEMP_DIR); assertNotNull("Expected temp file created", tempFileName); fbos.write(bytes[12]); fbos.write(bytes, 13, bytes.length - 13); - assertEquals("Temp file", tempFileName, findTempFileName()); + assertEquals("Temp file", tempFileName, findTempFileName(TEMP_DIR)); assertEquals("Size", bytes.length, fbos.asByteSource().size()); InputStream inputStream = fbos.asByteSource().openStream(); @@ -121,7 +118,7 @@ public class FileBackedOutputStreamTest { assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size()); - assertNull("Found unexpected temp file", findTempFileName()); + assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR)); } LOG.info("testFileThresholdReachedWithWriteBytes ending"); @@ -135,12 +132,12 @@ public class FileBackedOutputStreamTest { fbos.write(bytes[0]); fbos.write(bytes[1]); - assertNull("Found unexpected temp file", findTempFileName()); + assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR)); fbos.write(bytes[2]); fbos.flush(); - assertNotNull("Expected temp file created", findTempFileName()); + assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR)); assertEquals("Size", bytes.length, fbos.asByteSource().size()); assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read()); @@ -156,7 +153,7 @@ public class FileBackedOutputStreamTest { byte[] bytes = new byte[]{0, 1, 2}; fbos.write(bytes); - assertNull("Found unexpected temp file", findTempFileName()); + assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR)); assertEquals("Size", bytes.length, fbos.asByteSource().size()); // Should throw IOException after call to asByteSource. @@ -172,7 +169,7 @@ public class FileBackedOutputStreamTest { try { fbos = new FileBackedOutputStream(1, TEMP_DIR); fbos.write(new byte[] {0, 1}); - assertNotNull("Expected temp file created", findTempFileName()); + assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR)); } finally { if (fbos != null) { fbos.close(); @@ -183,7 +180,7 @@ public class FileBackedOutputStreamTest { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 20) { System.gc(); - if (findTempFileName() == null) { + if (findTempFileName(TEMP_DIR) == null) { return; } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -192,23 +189,30 @@ public class FileBackedOutputStreamTest { fail("Temp file was not deleted"); } - private static String findTempFileName() { - String[] files = new File(TEMP_DIR).list(); + static String findTempFileName(String dirPath) { + String[] files = new File(dirPath).list(); assertNotNull(files); assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2); return files.length == 1 ? files[0] : null; } - private static boolean deleteFile(String file) { + static boolean deleteFile(String file) { return new File(file).delete(); } - private static void deleteTempFiles() { - String[] files = new File(TEMP_DIR).list(); + static void deleteTempFiles(String path) { + String[] files = new File(path).list(); if (files != null) { for (String file: files) { - deleteFile(TEMP_DIR + File.separator + file); + deleteFile(path + File.separator + file); } } } + + static void createDir(String path) { + File dir = new File(path); + if (!dir.exists() && !dir.mkdirs()) { + throw new RuntimeException("Failed to create temp dir " + path); + } + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java new file mode 100644 index 0000000000..0d6f581123 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/SharedFileBackedOutputStreamTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.function.Consumer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for SharedFileBackedOutputStream. + * + * @author Thomas Pantelis + */ +public class SharedFileBackedOutputStreamTest { + private static final Logger LOG = LoggerFactory.getLogger(SharedFileBackedOutputStreamTest.class); + private static final String TEMP_DIR = "target/FileBackedOutputStreamTest"; + + @BeforeClass + public static void staticSetup() { + FileBackedOutputStreamTest.createDir(TEMP_DIR); + } + + @AfterClass + public static void staticCleanup() { + FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR); + FileBackedOutputStreamTest.deleteFile(TEMP_DIR); + } + + @Before + public void setup() { + FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR); + FileBackedOutputStream.REFERENCE_CACHE.clear(); + } + + @After + public void cleanup() { + FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR); + } + + @Test + public void testSingleUsage() throws IOException { + LOG.info("testSingleUsage starting"); + try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) { + byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6}; + fbos.write(bytes); + + assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + fbos.cleanup(); + assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + } + + LOG.info("testSingleUsage ending"); + } + + @SuppressWarnings("unchecked") + @Test + public void testSharing() throws IOException { + LOG.info("testSharing starting"); + try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) { + String context = "context"; + Consumer mockCallback = Mockito.mock(Consumer.class); + fbos.setOnCleanupCallback(mockCallback , context); + + byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6}; + fbos.write(bytes); + + assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + + fbos.incrementUsageCount(); + fbos.cleanup(); + assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + + fbos.incrementUsageCount(); + fbos.incrementUsageCount(); + + fbos.cleanup(); + assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + + fbos.cleanup(); + assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + + verify(mockCallback, never()).accept(context); + + fbos.cleanup(); + assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR)); + + verify(mockCallback).accept(context); + } + + LOG.info("testSharing ending"); + } +}