From 806f4dba9ba3db54d5320221a337f1906e6c0b1f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 31 Dec 2023 04:15:08 +0100 Subject: [PATCH] Do not leak ReplicatedLogEntry in Replicate message While convenient, this is a leak of constructs and contains a debug-only access to data. While we are at it make Replicate a simple record. JIRA: CONTROLLER-2044 Change-Id: I92a364a9590771f0bb9278e2e8ca8176691b8f75 Signed-off-by: Robert Varga --- .../controller/cluster/raft/RaftActor.java | 4 +- .../cluster/raft/base/messages/Replicate.java | 33 +------------ .../raft/behaviors/AbstractLeader.java | 13 +++-- .../cluster/raft/behaviors/LeaderTest.java | 49 +++++++------------ 4 files changed, 29 insertions(+), 70 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 130340c82e..d71d879a5c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -621,8 +621,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, - !batchHint)); + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(replicatedLogEntry.getIndex(), !batchHint, clientActor, identifier)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java index c58d86354a..edd4986a47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java @@ -5,40 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.base.messages; import akka.actor.ActorRef; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.yangtools.concepts.Identifier; -public class Replicate { - private final ActorRef clientActor; - private final Identifier identifier; - private final ReplicatedLogEntry replicatedLogEntry; - private final boolean sendImmediate; - - public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry, - boolean sendImmediate) { - this.clientActor = clientActor; - this.identifier = identifier; - this.replicatedLogEntry = replicatedLogEntry; - this.sendImmediate = sendImmediate; - } - - public ActorRef getClientActor() { - return clientActor; - } - - public Identifier getIdentifier() { - return identifier; - } - - public ReplicatedLogEntry getReplicatedLogEntry() { - return replicatedLogEntry; - } - - public boolean isSendImmediate() { - return sendImmediate; - } +public record Replicate(long logIndex, boolean sendImmediate, ActorRef clientActor, Identifier identifier) { + // Nothing else here } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a84f5b363f..b4d30c28b4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -647,17 +647,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void replicate(final Replicate replicate) { - long logIndex = replicate.getReplicatedLogEntry().getIndex(); + final long logIndex = replicate.logIndex(); - log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), - replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), - replicate.isSendImmediate()); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, isSendImmediate: {}", logName(), + replicate.identifier(), logIndex, replicate.sendImmediate()); // Create a tracker entry we will use this later to notify the // client actor - final var clientActor = replicate.getClientActor(); + final var clientActor = replicate.clientActor(); if (clientActor != null) { - trackers.add(new ClientRequestTrackerImpl(clientActor, replicate.getIdentifier(), logIndex)); + trackers.add(new ClientRequestTrackerImpl(clientActor, replicate.identifier(), logIndex)); } boolean applyModificationToState = !context.anyVotingPeers() @@ -668,7 +667,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(logIndex); } - if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { + if (replicate.sendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c3f3b3296b..bb8b4ca799 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -168,9 +168,8 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, final Payload payload) { - SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); - actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload)); + return leader.handleMessage(leaderActor, new Replicate(index, true, null, null)); } @Test @@ -546,16 +545,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setLastApplied(0); - long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; - long term = actorContext.getTermInformation().getCurrentTerm(); - ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( - newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); + final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; + final long term = actorContext.getTermInformation().getCurrentTerm(); + final var data = new MockRaftActorContext.MockPayload("foo"); - actorContext.getReplicatedLog().append(newEntry); + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data)); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, id, newEntry, true)); + final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -564,8 +561,7 @@ public class LeaderTest extends AbstractLeaderTest { // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous // one since lastApplied state is 0. - List applyStateList = MessageCollectorActor.getAllMatching( - leaderActor, ApplyState.class); + final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); for (int i = 0; i <= newLogIndex - 1; i++) { @@ -575,7 +571,7 @@ public class LeaderTest extends AbstractLeaderTest { } ApplyState last = applyStateList.get((int) newLogIndex - 1); - assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); + assertEquals("getData", data, last.getReplicatedLogEntry().getData()); assertEquals("getIdentifier", id, last.getIdentifier()); } @@ -670,18 +666,15 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - SimpleReplicatedLogEntry entry = - new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertTrue(raftBehavior instanceof Leader); @@ -718,15 +711,13 @@ public class LeaderTest extends AbstractLeaderTest { leader.setSnapshotHolder(null); // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -738,7 +729,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -781,10 +772,8 @@ public class LeaderTest extends AbstractLeaderTest { } // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -808,7 +797,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); // Sending Replicate message should not initiate another capture since the first is in progress. - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. -- 2.36.6