X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=892ea3356a58259b5a91b6ab3ea7bcfda8ef70b9;hb=HEAD;hp=d77a0841489347ea69177e80764b7fa6a45dd049;hpb=1ffd1f44c4beacdb28683c028bc0eaa209731098;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index d77a084148..892ea3356a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -5,35 +5,32 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.messages; import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** - * Invoked by leader to replicate log entries (§5.3); also used as - * heartbeat (§5.2). + * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2). */ -public class AppendEntries extends AbstractRaftRPC { +public final class AppendEntries extends AbstractRaftRPC { + @java.io.Serial private static final long serialVersionUID = 1L; // So that follower can redirect clients - private final String leaderId; + private final @NonNull String leaderId; // Index of log entry immediately preceding new ones private final long prevLogIndex; @@ -42,7 +39,7 @@ public class AppendEntries extends AbstractRaftRPC { private final long prevLogTerm; // log entries to store (empty for heart beat - may send more than one for efficiency) - private final List entries; + private final @NonNull List entries; // leader's commitIndex private final long leaderCommit; @@ -58,9 +55,10 @@ public class AppendEntries extends AbstractRaftRPC { private final String leaderAddress; - private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, - @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, - short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) { + AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex, + final long prevLogTerm, @NonNull final List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion, + final short leaderRaftVersion, @Nullable final String leaderAddress) { super(term); this.leaderId = requireNonNull(leaderId); this.prevLogIndex = prevLogIndex; @@ -74,23 +72,23 @@ public class AppendEntries extends AbstractRaftRPC { this.leaderAddress = leaderAddress; } - public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, - @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, - short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) { + public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex, + final long prevLogTerm, final @NonNull List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion, + final @Nullable String leaderAddress) { this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress); } @VisibleForTesting - public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, - @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, - short payloadVersion) { + public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex, + final long prevLogTerm, final @NonNull List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion) { this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, null); } - @Nonnull - public String getLeaderId() { + public @NonNull String getLeaderId() { return leaderId; } @@ -102,8 +100,7 @@ public class AppendEntries extends AbstractRaftRPC { return prevLogTerm; } - @Nonnull - public List getEntries() { + public @NonNull List getEntries() { return entries; } @@ -119,8 +116,8 @@ public class AppendEntries extends AbstractRaftRPC { return payloadVersion; } - public Optional getLeaderAddress() { - return Optional.ofNullable(leaderAddress); + public @Nullable String leaderAddress() { + return leaderAddress; } public short getLeaderRaftVersion() { @@ -141,14 +138,16 @@ public class AppendEntries extends AbstractRaftRPC { + ", entries=" + entries + "]"; } - private Object writeReplace() { - return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this); + @Override + Object writeReplace() { + return recipientRaftVersion <= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this); } /** * Fluorine version that adds the leader address. */ private static class ProxyV2 implements Externalizable { + @java.io.Serial private static final long serialVersionUID = 1L; private AppendEntries appendEntries; @@ -159,12 +158,12 @@ public class AppendEntries extends AbstractRaftRPC { public ProxyV2() { } - ProxyV2(AppendEntries appendEntries) { + ProxyV2(final AppendEntries appendEntries) { this.appendEntries = appendEntries; } @Override - public void writeExternal(ObjectOutput out) throws IOException { + public void writeExternal(final ObjectOutput out) throws IOException { out.writeShort(appendEntries.leaderRaftVersion); out.writeLong(appendEntries.getTerm()); out.writeObject(appendEntries.leaderId); @@ -185,7 +184,7 @@ public class AppendEntries extends AbstractRaftRPC { } @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { short leaderRaftVersion = in.readShort(); long term = in.readLong(); String leaderId = (String) in.readObject(); @@ -196,80 +195,19 @@ public class AppendEntries extends AbstractRaftRPC { short payloadVersion = in.readShort(); int size = in.readInt(); - List entries = new ArrayList<>(size); + var entries = ImmutableList.builderWithExpectedSize(size); for (int i = 0; i < size; i++) { entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); } String leaderAddress = (String)in.readObject(); - appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit, replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion, leaderAddress); } - private Object readResolve() { - return appendEntries; - } - } - - /** - * Pre-Fluorine version. - */ - @Deprecated - private static class Proxy implements Externalizable { - private static final long serialVersionUID = 1L; - - private AppendEntries appendEntries; - - // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't - // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. - @SuppressWarnings("checkstyle:RedundantModifier") - public Proxy() { - } - - Proxy(AppendEntries appendEntries) { - this.appendEntries = appendEntries; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(appendEntries.getTerm()); - out.writeObject(appendEntries.leaderId); - out.writeLong(appendEntries.prevLogTerm); - out.writeLong(appendEntries.prevLogIndex); - out.writeLong(appendEntries.leaderCommit); - out.writeLong(appendEntries.replicatedToAllIndex); - out.writeShort(appendEntries.payloadVersion); - - out.writeInt(appendEntries.entries.size()); - for (ReplicatedLogEntry e: appendEntries.entries) { - out.writeLong(e.getIndex()); - out.writeLong(e.getTerm()); - out.writeObject(e.getData()); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - long term = in.readLong(); - String leaderId = (String) in.readObject(); - long prevLogTerm = in.readLong(); - long prevLogIndex = in.readLong(); - long leaderCommit = in.readLong(); - long replicatedToAllIndex = in.readLong(); - short payloadVersion = in.readShort(); - - int size = in.readInt(); - List entries = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); - } - - appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, - replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null); - } - + @java.io.Serial private Object readResolve() { return appendEntries; }