Simplify appendReceived() signature 32/116232/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 11 Apr 2025 10:42:12 +0000 (12:42 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 11 Apr 2025 10:43:02 +0000 (12:43 +0200)
We only need the appended entry (for now), perhaps at some point not
even that. Simplify the metehod signature and add some documentation
about what is going on.

JIRA: CONTROLLER-2137
Change-Id: Ida272793dee645ffba571185b1ccb127a06d380d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockReplicatedLog.java

index f80f9f636b39cd8032a8d50e816a8700688a6baa..2fd510d894d0416cae4994194d790d55e37b5e72 100644 (file)
@@ -15,6 +15,7 @@ import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.spi.LogEntry;
 import org.opendaylight.raft.api.EntryMeta;
 
 /**
@@ -169,12 +170,12 @@ public interface ReplicatedLog {
      * Appends an entry received by a follower to the in-memory log and persists it as well, returning an indication
      * whether or not a snapshot should be taken.
      *
-     * @param <T> entry type
      * @param entry the entry to append
      * @param callback optional callback to be notified when persistence is complete
      * @return {@code true} if the journal requires trimming and a snapshot needs to be taken
      */
-    <T extends ReplicatedLogEntry> boolean appendReceived(@NonNull T entry, @Nullable Consumer<T> callback);
+    @NonNullByDefault
+    boolean appendReceived(ReplicatedLogEntry entry, @Nullable Consumer<LogEntry> callback);
 
     /**
      * Appends an entry submitted on the leader to the in-memory log and persists it as well.
index 6cbd42cf596487499d7caac4f2bdf9f071ec6fe2..ac6dd90ad461c9506cc236aa5c4a7921e01d622e 100644 (file)
@@ -11,6 +11,7 @@ import java.util.function.Consumer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.spi.LogEntry;
 import org.opendaylight.raft.api.EntryMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,7 +88,7 @@ final class ReplicatedLogImpl extends AbstractReplicatedLog {
     }
 
     @Override
-    public <T extends ReplicatedLogEntry> boolean appendReceived(final T entry, final Consumer<T> callback)  {
+    public boolean appendReceived(final ReplicatedLogEntry entry, final Consumer<LogEntry> callback) {
         LOG.debug("{}: Append log entry and persist {} ", memberId, entry);
 
         // FIXME: When can 'false' happen? Wouldn't that be an indication that Follower.handleAppendEntries() is doing
@@ -115,7 +116,7 @@ final class ReplicatedLogImpl extends AbstractReplicatedLog {
     }
 
     private <T extends ReplicatedLogEntry> void invokeSync(final @NonNull T entry,
-            final @Nullable Consumer<T> callback) {
+            final @Nullable Consumer<? super T> callback) {
         LOG.debug("{}: persist complete {}", memberId, entry);
 
         dataSizeSinceLastSnapshot += entry.size();
index f7c7549dcdb14abeb0f1ee6e5b1586a2407cb161..a29a4d8b69b4f1f5b2d65c063e95001f65830660 100644 (file)
@@ -23,7 +23,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.ApplyLeaderSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -309,7 +308,13 @@ public class Follower extends RaftActorBehavior {
         // applied to the state already, as the persistence callback occurs async, and we want those entries
         // purged from the persisted log as well.
         final var shouldCaptureSnapshot = new AtomicBoolean(false);
-        final Consumer<ReplicatedLogEntry> callback = logEntry -> {
+
+        // Note: this is quite fishy: while we are reusing this callback, the capture part of is only used for the
+        //       last entry -- so we really should split the below loop into 'leading entries' with a 'null' callback
+        //       and the last entry, with this callback.
+        //       Furthermore the correctness of this relies on the fact that the callback is invoked asynchronously,
+        //       i.e. in the next RaftActor processing cycle.
+        final Consumer<LogEntry> callback = logEntry -> {
             if (shouldCaptureSnapshot.get() && logEntry == entries.getLast()) {
                 context.getSnapshotManager().capture(replLog.lastMeta(), getReplicatedToAllIndex());
             }
index 3be18a4964f346c901ec4125e198ce9774123570..e13b58c5726a4747f5b0f8aaab569edd04c3ed3d 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.spi.LogEntry;
 import org.opendaylight.controller.cluster.raft.spi.TestTermInfoStore;
 import org.opendaylight.raft.api.EntryMeta;
 
@@ -124,7 +125,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
 
         @Override
-        public <T extends ReplicatedLogEntry> boolean appendReceived(final T entry, final Consumer<T> callback) {
+        public boolean appendReceived(final ReplicatedLogEntry entry, final Consumer<LogEntry> callback) {
             // FIXME: assertion here?
             append(entry);
             if (callback != null) {
index 4cf0118f9430850fd5add7c39ee339158c32a038..6a9abe96c14e897e310f7d22beb553bb2f59de1a 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.raft.spi.LogEntry;
 import org.opendaylight.raft.api.EntryMeta;
 
 final class MockReplicatedLog extends AbstractReplicatedLog {
@@ -21,7 +22,7 @@ final class MockReplicatedLog extends AbstractReplicatedLog {
     }
 
     @Override
-    public <T extends ReplicatedLogEntry> boolean appendReceived(final T entry, final Consumer<T> callback) {
+    public boolean appendReceived(final ReplicatedLogEntry entry, final Consumer<LogEntry> callback) {
         if (callback != null) {
             callback.accept(entry);
         }