X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLog.java;h=8cf133c2ab73ba2c62a9b177d3ea26d802e06abe;hp=c17f5448c6e256a97c4f7134959bb6c2d88a0971;hb=ff29db5dc6012f77bbe53f57ddce929b0de093b3;hpb=e71922c94cec22e9f37648a2d04bf2eb3274cf2f diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index c17f5448c6..8cf133c2ab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -5,101 +5,127 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; -import com.google.protobuf.ByteString; - import java.util.List; +import java.util.function.Consumer; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; /** - * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor + * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor. */ public interface ReplicatedLog { + long NO_MAX_SIZE = -1; + /** - * Get a replicated log entry at the specified index + * Return the replicated log entry at the specified index. * * @param index the index of the log entry - * @return the ReplicatedLogEntry at index. null if index less than 0 or - * greater than the size of the in-memory journal. + * @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or + * greater than the size of the in-memory journal */ - ReplicatedLogEntry get(long index); - + @Nullable ReplicatedLogEntry get(long index); /** - * Get the last replicated log entry + * Return the last replicated log entry in the log or null of not found. * - * @return + * @return the last replicated log entry in the log or null of not found. */ - ReplicatedLogEntry last(); + @Nullable ReplicatedLogEntry last(); /** + * Return the index of the last entry in the log or -1 if the log is empty. * - * @return + * @return the index of the last entry in the log or -1 if the log is empty. */ long lastIndex(); /** + * Return the term of the last entry in the log or -1 if the log is empty. * - * @return + * @return the term of the last entry in the log or -1 if the log is empty. */ long lastTerm(); /** - * To be called when we need to remove entries from the in-memory log. - * This method will remove all entries >= index. This method should be used - * during recovery to appropriately trim the log based on persisted - * information + * Removes entries from the in-memory log starting at the given index. * - * @param index the index of the log entry + * @param index the index of the first log entry to remove + * @return the adjusted index of the first log entry removed or -1 if the log entry is not found. */ - void removeFrom(long index); - + long removeFrom(long index); /** - * To be called when we need to remove entries from the in-memory log and we - * need that information persisted to disk. This method will remove all - * entries >= index. + * Removes entries from the in-memory log and the persisted log starting at the given index. + * *

* The persisted information would then be used during recovery to properly * reconstruct the state of the in-memory replicated log * - * @param index the index of the log entry + * @param index the index of the first log entry to remove + * @return true if entries were removed, false otherwise + */ + boolean removeFromAndPersist(long index); + + /** + * Appends an entry to the log. + * + * @param replicatedLogEntry the entry to append + * @return true if the entry was successfully appended, false otherwise. An entry can fail to append if + * the index is already included in the log. */ - void removeFromAndPersist(long index); + boolean append(ReplicatedLogEntry replicatedLogEntry); /** - * Append an entry to the log - * @param replicatedLogEntry + * Optimization method to increase the capacity of the journal log prior to appending entries. + * + * @param amount the amount to increase by */ - void append(ReplicatedLogEntry replicatedLogEntry); + void increaseJournalLogCapacity(int amount); /** + * Appends an entry to the in-memory log and persists it as well. * - * @param replicatedLogEntry + * @param replicatedLogEntry the entry to append + * @param callback the callback to be notified when persistence is complete (optional). + * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist + * call and the execution of the associated callback. If false, subsequent messages are stashed and get + * delivered after persistence is complete and the associated callback is executed. In either case the + * callback is guaranteed to execute in the context of the actor associated with this log. + * @return true if the entry was successfully appended, false otherwise. */ - void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); + boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry, + @Nullable Consumer callback, boolean doAsync); /** + * Returns a list of log entries starting from the given index to the end of the log. * - * @param index the index of the log entry + * @param index the index of the first log entry to get. + * @return the List of entries */ - List getFrom(long index); + @NonNull List getFrom(long index); /** + * Returns a list of log entries starting from the given index up to the given maximum of entries or + * the given maximum accumulated size, whichever comes first. * - * @param index the index of the log entry + * @param index the index of the first log entry to get + * @param maxEntries the maximum number of entries to get + * @param maxDataSize the maximum accumulated size of the log entries to get + * @return the List of entries meeting the criteria. */ - List getFrom(long index, int max); + @NonNull List getFrom(long index, int maxEntries, long maxDataSize); /** + * Returns the number of entries in the journal. * - * @return + * @return the number of entries */ long size(); /** - * Checks if the entry at the specified index is present or not + * Checks if the entry at the specified index is present or not. * * @param index the index of the log entry * @return true if the entry is present in the in-memory journal @@ -107,78 +133,98 @@ public interface ReplicatedLog { boolean isPresent(long index); /** - * Checks if the entry is present in a snapshot + * Checks if the entry is present in a snapshot. * * @param index the index of the log entry - * @return true if the entry is in the snapshot. false if the entry is not - * in the snapshot even if the entry may be present in the replicated log + * @return true if the entry is in the snapshot. false if the entry is not in the snapshot even if the entry may + * be present in the replicated log */ boolean isInSnapshot(long index); /** - * Get the snapshot - * - * @return an object representing the snapshot if it exists. null otherwise - */ - ByteString getSnapshot(); - - /** - * Get the index of the snapshot + * Returns the index of the snapshot. * * @return the index from which the snapshot was created. -1 otherwise. */ long getSnapshotIndex(); /** - * Get the term of the snapshot + * Returns the term of the snapshot. * - * @return the term of the index from which the snapshot was created. -1 - * otherwise + * @return the term of the index from which the snapshot was created. -1 otherwise */ long getSnapshotTerm(); /** - * sets the snapshot index in the replicated log - * @param snapshotIndex + * Sets the snapshot index in the replicated log. + * + * @param snapshotIndex the index to set */ void setSnapshotIndex(long snapshotIndex); /** - * sets snapshot term - * @param snapshotTerm + * Sets snapshot term. + * + * @param snapshotTerm the term to set + */ + void setSnapshotTerm(long snapshotTerm); + + /** + * Clears the journal entries with startIndex (inclusive) and endIndex (exclusive). + * + * @param startIndex the start index (inclusive) + * @param endIndex the end index (exclusive) + */ + void clear(int startIndex, int endIndex); + + /** + * Handles all the bookkeeping in order to perform a rollback in the event of SaveSnapshotFailure. + * + * @param snapshotCapturedIndex the new snapshot index + * @param snapshotCapturedTerm the new snapshot term */ - public void setSnapshotTerm(long snapshotTerm); + void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); /** - * sets the snapshot in bytes - * @param snapshot + * Sets the Replicated log to state after snapshot success. This method is equivalent to + * {@code snapshotCommit(true)}. */ - public void setSnapshot(ByteString snapshot); + default void snapshotCommit() { + snapshotCommit(true); + } /** - * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) - * @param startIndex - * @param endIndex + * Sets the Replicated log to state after snapshot success. Most users will want to use {@link #snapshotCommit()} + * instead. + * + * @param updateDataSize true if {@link #dataSize()} should also be updated */ - public void clear(int startIndex, int endIndex); + void snapshotCommit(boolean updateDataSize); /** - * Handles all the bookkeeping in order to perform a rollback in the - * event of SaveSnapshotFailure - * @param snapshot - * @param snapshotCapturedIndex - * @param snapshotCapturedTerm + * Restores the replicated log to a state in the event of a save snapshot failure. */ - public void snapshotPreCommit(ByteString snapshot, - long snapshotCapturedIndex, long snapshotCapturedTerm); + void snapshotRollback(); /** - * Sets the Replicated log to state after snapshot success. + * Returns the size of the data in the log (in bytes). + * + * @return the size of the data in the log (in bytes) */ - public void snapshotCommit(); + int dataSize(); /** - * Restores the replicated log to a state in the event of a save snapshot failure + * Determines if a snapshot needs to be captured based on the count/memory consumed and initiates the capture. + * + * @param replicatedLogEntry the last log entry. + */ + void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry); + + /** + * Determines if a snapshot should be captured based on the count/memory consumed. + * + * @param logIndex the log index to use to determine if the log count has exceeded the threshold + * @return true if a snapshot should be captured, false otherwise */ - public void snapshotRollback(); + boolean shouldCaptureSnapshot(long logIndex); }