* The state of the followers log as known by the Leader.
*/
public interface FollowerLogInformation {
+ long NO_INDEX = -1;
/**
* Increments the value of the follower's next index.
* Clears the LeaderInstallSnapshotState when an install snapshot is complete.
*/
void clearLeaderInstallSnapshotState();
+
+ /**
+ * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
+ * needs to be sliced into smaller chunks.
+ *
+ * @param index the log entry index or NO_INDEX to clear it
+ */
+ void setSlicedLogEntryIndex(long index);
+
+ /**
+ * Return whether or not log entry slicing is currently in progress.
+ *
+ * @return true if slicing is currently in progress, false otherwise
+ */
+ boolean isLogEntrySlicingInProgress();
}
private LeaderInstallSnapshotState installSnapshotState;
+ private long slicedLogEntryIndex = NO_INDEX;
+
/**
* Constructs an instance.
*
@Override
public boolean setMatchIndex(long matchIndex) {
+ // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
+ // and the follower received the entry and responded so clear the slicedLogEntryIndex
+ if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
+ slicedLogEntryIndex = NO_INDEX;
+ }
+
if (this.matchIndex != matchIndex) {
this.matchIndex = matchIndex;
return true;
installSnapshotState = null;
}
+ @Override
+ public void setSlicedLogEntryIndex(long index) {
+ slicedLogEntryIndex = index;
+ }
+
+ @Override
+ public boolean isLogEntrySlicingInProgress() {
+ return slicedLogEntryIndex != NO_INDEX;
+ }
+
@Override
public String toString() {
return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
Consumer<ApplyState> getApplyStateConsumer();
/**
- * Creates a FileBackedOutputStream with a common configuration.
+ * Returns the {@link FileBackedOutputStreamFactory} instance with a common configuration.
*
- * @return a FileBackedOutputStream instance
+ * @return the {@link FileBackedOutputStreamFactory};
*/
@Nonnull
- FileBackedOutputStream newFileBackedOutputStream();
+ FileBackedOutputStreamFactory getFileBackedOutputStreamFactory();
/**
* Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress.
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
private final Consumer<ApplyState> applyStateConsumer;
+ private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
+
private RaftActorLeadershipTransferCohort leadershipTransferCohort;
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
this.log = Preconditions.checkNotNull(logger);
this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
+ fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
+ configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
+
for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
}
}
@Override
- public FileBackedOutputStream newFileBackedOutputStream() {
- return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
- configParams.getTempFileDirectory());
+ public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
+ return fileBackedOutputStreamFactory;
}
@SuppressWarnings("checkstyle:IllegalCatch")
OutputStream installSnapshotStream = null;
if (targetFollower != null) {
- installSnapshotStream = context.newFileBackedOutputStream();
+ installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
log.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
} else {
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
*/
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ /**
+ * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+ * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+ * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+ */
+ private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+ private final MessageSlicer appendEntriesMessageSlicer;
+
private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
@Nullable AbstractLeader initializeFromLeader) {
super(context, state);
+ appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+ .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+ .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+ TimeUnit.MILLISECONDS).build();
+
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
snapshotHolder = initializeFromLeader.snapshotHolder;
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ if (appendEntriesMessageSlicer.handleMessage(message)) {
+ return this;
+ }
+
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
+ } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+ sendAppendEntries = sendHeartbeat;
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
followerNextIndex, followerId);
if (followerLogInformation.okToReplicate()) {
- // Try to send all the entries in the journal but not exceeding the max data size
- // for a single AppendEntries message.
- int maxEntries = (int) context.getReplicatedLog().size();
- entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
- context.getConfigParams().getSnapshotChunkSize());
+ entries = getEntriesToSend(followerLogInformation, followerActor);
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0
}
}
+ private List<ReplicatedLogEntry> getEntriesToSend(FollowerLogInformation followerLogInfo,
+ ActorSelection followerActor) {
+ // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+ // message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+ final long followerNextIndex = followerLogInfo.getNextIndex();
+ List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+ maxEntries, maxDataSize);
+
+ // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+ // that is the case, then we need to slice it into smaller chunks.
+ if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+ // Don't need to slice.
+ return entries;
+ }
+
+ log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+ maxDataSize);
+
+ // If an AppendEntries has already been serialized for the log index then reuse the
+ // SharedFileBackedOutputStream.
+ final Long logIndex = entries.get(0).getIndex();
+ SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+ if (fileBackedStream == null) {
+ fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+ final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+ context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+ log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+ followerLogInfo.getId());
+
+ try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+ out.writeObject(appendEntries);
+ } catch (IOException e) {
+ log.error("{}: Error serializing {}", logName(), appendEntries, e);
+ fileBackedStream.cleanup();
+ return Collections.emptyList();
+ }
+
+ sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+ fileBackedStream.setOnCleanupCallback(index -> {
+ log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+ sharedSerializedAppendEntriesStreams.remove(index);
+ }, logIndex);
+ } else {
+ log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+ fileBackedStream.incrementUsageCount();
+ }
+
+ log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+ // Record that slicing is in progress for the follower.
+ followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+ final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+ appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+ .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+ .onFailureCallback(failure -> {
+ log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+ followerLogInfo.getId(), failure);
+ followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+ }).build());
+
+ return Collections.emptyList();
+ }
+
private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
FollowerLogInformation followerLogInformation) {
// In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
// empty AppendEntries as a heart beat to prevent election.
// - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
// need to send AppendEntries to prevent election.
+ // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+ // need to send an empty AppendEntries to prevent election.
boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
- long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
- context.getCommitIndex();
+ long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+ || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
long followerNextIndex = followerLogInformation.getNextIndex();
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
if (!followerToLog.isEmpty()) {
log.trace("{}: Sending heartbeat", logName());
sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+
+ appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
}
}
@Override
public void close() {
stopHeartBeat();
+ appendEntriesMessageSlicer.close();
}
@Override
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
private final SyncStatusTracker initialSyncStatusTracker;
+ private final MessageAssembler appendEntriesMessageAssembler;
+
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
private String leaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
.getSyncIndexThreshold());
+ appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
+ .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(TimeoutNow.INSTANCE, actor());
} else {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
+ appendEntriesMessageAssembler.checkExpiredAssembledMessageState();
+
return this;
}
return handleElectionTimeout(message);
}
+ if (appendEntriesMessageAssembler.handleMessage(message, actor())) {
+ return this;
+ }
+
if (!(message instanceof RaftRPC)) {
// The rest of the processing requires the message to be a RaftRPC
return null;
public void close() {
closeSnapshotTracker();
stopElection();
+ appendEntriesMessageAssembler.close();
}
@VisibleForTesting
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
+
+/**
+ * An Identifier for a follower.
+ *
+ * @author Thomas Pantelis
+ */
+class FollowerIdentifier extends AbstractStringIdentifier<FollowerIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ FollowerIdentifier(String followerId) {
+ super(followerId);
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private FollowerIdentifier identifier;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(FollowerIdentifier identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(identifier.getValue());
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ identifier = new FollowerIdentifier((String) in.readObject());
+ }
+
+ private Object readResolve() {
+ return identifier;
+ }
+ }
+}
this.log = log;
this.totalChunks = totalChunks;
this.leaderId = Preconditions.checkNotNull(leaderId);
- fileBackedStream = context.newFileBackedOutputStream();
+ fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance();
bufferedStream = new BufferedOutputStream(fileBackedStream);
}
return serializedSize;
}
+ @Override
+ public int hashCode() {
+ return serverConfig.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ ServerConfigurationPayload other = (ServerConfigurationPayload) obj;
+ return serverConfig.equals(other.serverConfig);
+ }
+
@Override
public String toString() {
return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]";
protected long currentTerm;
protected int snapshotBatchCount = 4;
+ protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
protected List<MockPayload> expSnapshotState = new ArrayList<>();
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
+ configParams.setSnapshotChunkSize(snapshotChunkSize);
return configParams;
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+
+/**
+ * Tests end-to-end replication of sliced log entry payloads, ie entries whose size exceeds the maximum size for a
+ * single AppendEntries message.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicationWithSlicedPayloadIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+ @Test
+ public void runTest() throws Exception {
+ testLog.info("ReplicationWithSlicedPayloadIntegrationTest starting");
+
+ // Create the leader and 2 follower actors.
+
+ snapshotChunkSize = 20;
+
+ DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+ followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), followerConfigParams);
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
+
+ leaderConfigParams = newLeaderConfigParams();
+ leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+ follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+ leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+ waitUntilLeader(leaderActor);
+
+ currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+ // Send a large payload that exceeds the size threshold and needs to be sliced.
+
+ MockPayload largePayload = sendPayloadData(leaderActor, "large", snapshotChunkSize + 1);
+
+ // Then send a small payload that does not need to be sliced.
+
+ MockPayload smallPayload = sendPayloadData(leaderActor, "normal", snapshotChunkSize - 1);
+
+ final List<ApplyState> leaderApplyState = expectMatching(leaderCollectorActor, ApplyState.class, 2);
+ verifyApplyState(leaderApplyState.get(0), leaderCollectorActor,
+ largePayload.toString(), currentTerm, 0, largePayload);
+ verifyApplyState(leaderApplyState.get(1), leaderCollectorActor,
+ smallPayload.toString(), currentTerm, 1, smallPayload);
+
+ final List<ApplyState> follower1ApplyState = expectMatching(follower1CollectorActor, ApplyState.class, 2);
+ verifyApplyState(follower1ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+ verifyApplyState(follower1ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+ final List<ApplyState> follower2ApplyState = expectMatching(follower2CollectorActor, ApplyState.class, 2);
+ verifyApplyState(follower2ApplyState.get(0), null, null, currentTerm, 0, largePayload);
+ verifyApplyState(follower2ApplyState.get(1), null, null, currentTerm, 1, smallPayload);
+
+ testLog.info("ReplicationWithSlicedPayloadIntegrationTest ending");
+ }
+}
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
doReturn(5L).when(mockElectionTerm).getCurrentTerm();
doReturn("member5").when(mockElectionTerm).getVotedFor();
- doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
+ doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+ .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for FollowerIdentifier.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerIdentifierTest {
+
+ @Test
+ public void testSerialization() throws FileNotFoundException, IOException {
+ FollowerIdentifier expected = new FollowerIdentifier("follower1");
+ FollowerIdentifier cloned = (FollowerIdentifier) SerializationUtils.clone(expected);
+ assertEquals("cloned", expected, cloned);
+ }
+}
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.concepts.Identifier;
}
private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+ }
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
+ @Test
+ public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+ logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+ final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+ Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+ final MockRaftActorContext.MockPayload largePayload =
+ new MockRaftActorContext.MockPayload("large", serializedSize);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(300, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send normal payload first to prime commit index.
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ sendReplicate(leaderActorContext, term, 0);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+ assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+ sendReplicate(leaderActorContext, term, 1, largePayload);
+
+ MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+ assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+ final Identifier slicingId = messageSlice.getIdentifier();
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+ messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send another normal payload.
+
+ sendReplicate(leaderActorContext, term, 2);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+ }
+
+ @Test
+ public void testLargePayloadSlicingExpiration() {
+ logStart("testLargePayloadSlicingExpiration");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+ leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+ // Sleep for at least 3 * election timeout so the slicing state expires.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+ MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send an AppendEntriesReply - this should restart the slicing.
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
chunk3 = getNextChunk(byteString, 20, byteString.size());
fbos = spy(new FileBackedOutputStream(100000000, "target"));
- doReturn(fbos).when(mockContext).newFileBackedOutputStream();
+ FileBackedOutputStreamFactory mockFactory = mock(FileBackedOutputStreamFactory.class);
+ doReturn(fbos).when(mockFactory).newInstance();
+ doReturn(mockFactory).when(mockContext).getFileBackedOutputStreamFactory();
}
@Test
public FileBackedOutputStream newInstance() {
return new FileBackedOutputStream(fileThreshold, fileDirectory);
}
+
+ /**
+ * Creates a new {@link SharedFileBackedOutputStream} with the settings configured for this factory.
+ *
+ * @return a {@link SharedFileBackedOutputStream} instance
+ */
+ public SharedFileBackedOutputStream newSharedInstance() {
+ return new SharedFileBackedOutputStream(fileThreshold, fileDirectory);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * A FileBackedOutputStream that allows for sharing in that it maintains a usage count and the backing file isn't
+ * deleted until the usage count reaches 0. The usage count is initialized to 1 on construction. Subsequent users of
+ * the instance must call {@link #incrementUsageCount()}. The {@link #cleanup()} method decrements the usage count and,
+ * when it reaches 0, the {@link FileBackedOutputStream#cleanup()} is called to delete the backing file.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStream extends FileBackedOutputStream {
+ private final AtomicInteger usageCount = new AtomicInteger(1);
+ @SuppressWarnings("rawtypes")
+ private Consumer onCleanupCallback;
+ private Object onCleanupContext;
+
+ public SharedFileBackedOutputStream(int fileThreshold, String fileDirectory) {
+ super(fileThreshold, fileDirectory);
+ }
+
+ /**
+ * Increments the usage count. This must be followed by a corresponding call to {@link #cleanup()} when this
+ * instance is no longer needed.
+ */
+ public void incrementUsageCount() {
+ usageCount.getAndIncrement();
+ }
+
+ /**
+ * Returns the current usage count.
+ *
+ * @return the current usage count
+ */
+ public int getUsageCount() {
+ return usageCount.get();
+ }
+
+ /**
+ * Sets the callback to be notified when {@link FileBackedOutputStream#cleanup()} is called to delete the backing
+ * file.
+ */
+ public <T> void setOnCleanupCallback(Consumer<T> callback, T context) {
+ onCleanupCallback = callback;
+ onCleanupContext = context;
+ }
+
+ /**
+ * Overridden to decrement the usage count.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cleanup() {
+ Preconditions.checkState(usageCount.get() > 0);
+
+ if (usageCount.decrementAndGet() == 0) {
+ super.cleanup();
+
+ if (onCleanupCallback != null) {
+ onCleanupCallback.accept(onCleanupContext);
+ }
+ }
+ }
+}
@BeforeClass
public static void staticSetup() {
- File dir = new File(TEMP_DIR);
- if (!dir.exists() && !dir.mkdirs()) {
- throw new RuntimeException("Failed to create temp dir " + TEMP_DIR);
- }
+ createDir(TEMP_DIR);
}
@AfterClass
public static void staticCleanup() {
- deleteTempFiles();
+ deleteTempFiles(TEMP_DIR);
deleteFile(TEMP_DIR);
}
@Before
public void setup() {
- deleteTempFiles();
+ deleteTempFiles(TEMP_DIR);
FileBackedOutputStream.REFERENCE_CACHE.clear();
}
@After
public void cleanup() {
- deleteTempFiles();
+ deleteTempFiles(TEMP_DIR);
}
@Test
fbos.write(bytes, 1, bytes.length - 1);
assertEquals("getCount", bytes.length, fbos.getCount());
- assertNull("Found unexpected temp file", findTempFileName());
+ assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
assertEquals("Size", bytes.length, fbos.asByteSource().size());
// Read bytes twice.
fbos.write(bytes[0]);
fbos.write(bytes, 1, 11);
- String tempFileName = findTempFileName();
+ String tempFileName = findTempFileName(TEMP_DIR);
assertNotNull("Expected temp file created", tempFileName);
fbos.write(bytes[12]);
fbos.write(bytes, 13, bytes.length - 13);
- assertEquals("Temp file", tempFileName, findTempFileName());
+ assertEquals("Temp file", tempFileName, findTempFileName(TEMP_DIR));
assertEquals("Size", bytes.length, fbos.asByteSource().size());
InputStream inputStream = fbos.asByteSource().openStream();
assertEquals("Reference cache size", 0, FileBackedOutputStream.REFERENCE_CACHE.size());
- assertNull("Found unexpected temp file", findTempFileName());
+ assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
}
LOG.info("testFileThresholdReachedWithWriteBytes ending");
fbos.write(bytes[0]);
fbos.write(bytes[1]);
- assertNull("Found unexpected temp file", findTempFileName());
+ assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
fbos.write(bytes[2]);
fbos.flush();
- assertNotNull("Expected temp file created", findTempFileName());
+ assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
assertEquals("Size", bytes.length, fbos.asByteSource().size());
assertArrayEquals("Read bytes", bytes, fbos.asByteSource().read());
byte[] bytes = new byte[]{0, 1, 2};
fbos.write(bytes);
- assertNull("Found unexpected temp file", findTempFileName());
+ assertNull("Found unexpected temp file", findTempFileName(TEMP_DIR));
assertEquals("Size", bytes.length, fbos.asByteSource().size());
// Should throw IOException after call to asByteSource.
try {
fbos = new FileBackedOutputStream(1, TEMP_DIR);
fbos.write(new byte[] {0, 1});
- assertNotNull("Expected temp file created", findTempFileName());
+ assertNotNull("Expected temp file created", findTempFileName(TEMP_DIR));
} finally {
if (fbos != null) {
fbos.close();
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 20) {
System.gc();
- if (findTempFileName() == null) {
+ if (findTempFileName(TEMP_DIR) == null) {
return;
}
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
fail("Temp file was not deleted");
}
- private static String findTempFileName() {
- String[] files = new File(TEMP_DIR).list();
+ static String findTempFileName(String dirPath) {
+ String[] files = new File(dirPath).list();
assertNotNull(files);
assertTrue("Found more than one temp file: " + Arrays.toString(files), files.length < 2);
return files.length == 1 ? files[0] : null;
}
- private static boolean deleteFile(String file) {
+ static boolean deleteFile(String file) {
return new File(file).delete();
}
- private static void deleteTempFiles() {
- String[] files = new File(TEMP_DIR).list();
+ static void deleteTempFiles(String path) {
+ String[] files = new File(path).list();
if (files != null) {
for (String file: files) {
- deleteFile(TEMP_DIR + File.separator + file);
+ deleteFile(path + File.separator + file);
}
}
}
+
+ static void createDir(String path) {
+ File dir = new File(path);
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new RuntimeException("Failed to create temp dir " + path);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for SharedFileBackedOutputStream.
+ *
+ * @author Thomas Pantelis
+ */
+public class SharedFileBackedOutputStreamTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SharedFileBackedOutputStreamTest.class);
+ private static final String TEMP_DIR = "target/FileBackedOutputStreamTest";
+
+ @BeforeClass
+ public static void staticSetup() {
+ FileBackedOutputStreamTest.createDir(TEMP_DIR);
+ }
+
+ @AfterClass
+ public static void staticCleanup() {
+ FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+ FileBackedOutputStreamTest.deleteFile(TEMP_DIR);
+ }
+
+ @Before
+ public void setup() {
+ FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+ FileBackedOutputStream.REFERENCE_CACHE.clear();
+ }
+
+ @After
+ public void cleanup() {
+ FileBackedOutputStreamTest.deleteTempFiles(TEMP_DIR);
+ }
+
+ @Test
+ public void testSingleUsage() throws IOException {
+ LOG.info("testSingleUsage starting");
+ try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+ byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+ fbos.write(bytes);
+
+ assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+ fbos.cleanup();
+ assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+ }
+
+ LOG.info("testSingleUsage ending");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSharing() throws IOException {
+ LOG.info("testSharing starting");
+ try (SharedFileBackedOutputStream fbos = new SharedFileBackedOutputStream(5, TEMP_DIR)) {
+ String context = "context";
+ Consumer<String> mockCallback = Mockito.mock(Consumer.class);
+ fbos.setOnCleanupCallback(mockCallback , context);
+
+ byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5, 6};
+ fbos.write(bytes);
+
+ assertNotNull("Expected temp file created", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+ fbos.incrementUsageCount();
+ fbos.cleanup();
+ assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+ fbos.incrementUsageCount();
+ fbos.incrementUsageCount();
+
+ fbos.cleanup();
+ assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+ fbos.cleanup();
+ assertNotNull("Expected temp file exists", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+ verify(mockCallback, never()).accept(context);
+
+ fbos.cleanup();
+ assertNull("Found unexpected temp file", FileBackedOutputStreamTest.findTempFileName(TEMP_DIR));
+
+ verify(mockCallback).accept(context);
+ }
+
+ LOG.info("testSharing ending");
+ }
+}