Do not leak ReplicatedLogEntry in Replicate message 07/109507/7
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 31 Dec 2023 03:15:08 +0000 (04:15 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 31 Dec 2023 11:46:37 +0000 (12:46 +0100)
While convenient, this is a leak of constructs and contains a debug-only
access to data. While we are at it make Replicate a simple record.

JIRA: CONTROLLER-2044
Change-Id: I92a364a9590771f0bb9278e2e8ca8176691b8f75
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 130340c82eb44f8f91600c9716fe295024319da5..d71d879a5c6016140615233479411b0a4242a105 100644 (file)
@@ -621,8 +621,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
-            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
-                    !batchHint));
+            getCurrentBehavior().handleMessage(getSelf(),
+                new Replicate(replicatedLogEntry.getIndex(), !batchHint, clientActor, identifier));
         }
     }
 
index c58d86354a917d9fc968de3eff2b97ba6107d356..edd4986a47b4b36b036f2f0ea209ac32aab69b0c 100644 (file)
@@ -5,40 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.yangtools.concepts.Identifier;
 
-public class Replicate {
-    private final ActorRef clientActor;
-    private final Identifier identifier;
-    private final ReplicatedLogEntry replicatedLogEntry;
-    private final boolean sendImmediate;
-
-    public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry,
-            boolean sendImmediate) {
-        this.clientActor = clientActor;
-        this.identifier = identifier;
-        this.replicatedLogEntry = replicatedLogEntry;
-        this.sendImmediate = sendImmediate;
-    }
-
-    public ActorRef getClientActor() {
-        return clientActor;
-    }
-
-    public Identifier getIdentifier() {
-        return identifier;
-    }
-
-    public ReplicatedLogEntry getReplicatedLogEntry() {
-        return replicatedLogEntry;
-    }
-
-    public boolean isSendImmediate() {
-        return sendImmediate;
-    }
+public record Replicate(long logIndex, boolean sendImmediate, ActorRef clientActor, Identifier identifier) {
+    // Nothing else here
 }
index a84f5b363fba94ec54d2c3a88a68063061f3ecde..b4d30c28b43f47a39cba70c08fb5765722e6300f 100644 (file)
@@ -647,17 +647,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void replicate(final Replicate replicate) {
-        long logIndex = replicate.getReplicatedLogEntry().getIndex();
+        final long logIndex = replicate.logIndex();
 
-        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
-                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
-                replicate.isSendImmediate());
+        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, isSendImmediate: {}", logName(),
+                replicate.identifier(), logIndex, replicate.sendImmediate());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        final var clientActor = replicate.getClientActor();
+        final var clientActor = replicate.clientActor();
         if (clientActor != null) {
-            trackers.add(new ClientRequestTrackerImpl(clientActor, replicate.getIdentifier(), logIndex));
+            trackers.add(new ClientRequestTrackerImpl(clientActor, replicate.identifier(), logIndex));
         }
 
         boolean applyModificationToState = !context.anyVotingPeers()
@@ -668,7 +667,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(logIndex);
         }
 
-        if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
+        if (replicate.sendImmediate() && !followerToLog.isEmpty()) {
             sendAppendEntries(0, false);
         }
     }
index c3f3b3296bc6456b8b21e4126c2fe05399c2f689..bb8b4ca799a810da614cf7b120aab49bde04da34 100644 (file)
@@ -168,9 +168,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
             final Payload payload) {
-        SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
-        actorContext.getReplicatedLog().append(newEntry);
-        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
+        actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload));
+        return leader.handleMessage(leaderActor, new Replicate(index, true, null, null));
     }
 
     @Test
@@ -546,16 +545,14 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.setLastApplied(0);
 
-        long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
-        long term = actorContext.getTermInformation().getCurrentTerm();
-        ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
-                newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
+        final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+        final long term = actorContext.getTermInformation().getCurrentTerm();
+        final var data = new MockRaftActorContext.MockPayload("foo");
 
-        actorContext.getReplicatedLog().append(newEntry);
+        actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data));
 
         final Identifier id = new MockIdentifier("state-id");
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(leaderActor, id, newEntry, true));
+        final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -564,8 +561,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
         // one since lastApplied state is 0.
-        List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
-                leaderActor, ApplyState.class);
+        final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
         for (int i = 0; i <= newLogIndex - 1; i++) {
@@ -575,7 +571,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         }
 
         ApplyState last = applyStateList.get((int) newLogIndex - 1);
-        assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+        assertEquals("getData", data, last.getReplicatedLogEntry().getData());
         assertEquals("getIdentifier", id, last.getIdentifier());
     }
 
@@ -670,18 +666,15 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // new entry
-        SimpleReplicatedLogEntry entry =
-                new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
-                        new MockRaftActorContext.MockPayload("D"));
-
-        actorContext.getReplicatedLog().append(entry);
+        actorContext.getReplicatedLog().append(
+            new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
 
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+                leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -718,15 +711,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.setSnapshotHolder(null);
 
         // new entry
-        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
-                new MockRaftActorContext.MockPayload("D"));
-
-        actorContext.getReplicatedLog().append(entry);
+        actorContext.getReplicatedLog().append(
+            new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
 
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -738,7 +729,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -781,10 +772,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         }
 
         // new entry
-        SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
-                new MockRaftActorContext.MockPayload("D"));
-
-        actorContext.getReplicatedLog().append(entry);
+        actorContext.getReplicatedLog().append(
+            new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
 
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
@@ -808,7 +797,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         // Sending Replicate message should not initiate another capture since the first is in progress.
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.