X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=95965d560f4e02c319c2fadb56394f5ecc0a2ceb;hb=534bf6f83465cc8a575b097c1e28fbb1f34d110a;hp=d8075f4381957a66e1958d36f7b9886c28efb5c5;hpb=a564647b197ef00124b9ae6aa00578dd73e27aa1;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index d8075f4381..95965d560f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -5,37 +5,32 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.messages; -import com.google.protobuf.GeneratedMessage; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; /** - * Invoked by leader to replicate log entries (§5.3); also used as - * heartbeat (§5.2). + * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2). */ -public class AppendEntries extends AbstractRaftRPC { +public final class AppendEntries extends AbstractRaftRPC { private static final long serialVersionUID = 1L; - public static final Class LEGACY_SERIALIZABLE_CLASS = - AppendEntriesMessages.AppendEntries.class; - - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); - // So that follower can redirect clients - private final String leaderId; + private final @NonNull String leaderId; // Index of log entry immediately preceding new ones private final long prevLogIndex; @@ -43,9 +38,8 @@ public class AppendEntries extends AbstractRaftRPC { // term of prevLogIndex entry private final long prevLogTerm; - // log entries to store (empty for heartbeat; - // may send more than one for efficiency) - private transient List entries; + // log entries to store (empty for heart beat - may send more than one for efficiency) + private final @NonNull List entries; // leader's commitIndex private final long leaderCommit; @@ -55,41 +49,46 @@ public class AppendEntries extends AbstractRaftRPC { private final short payloadVersion; - public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm, - List entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) { + private final short recipientRaftVersion; + + private final short leaderRaftVersion; + + private final String leaderAddress; + + private AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex, + final long prevLogTerm, @NonNull final List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion, + final short leaderRaftVersion, @Nullable final String leaderAddress) { super(term); - this.leaderId = leaderId; + this.leaderId = requireNonNull(leaderId); this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; - this.entries = entries; + this.entries = requireNonNull(entries); this.leaderCommit = leaderCommit; this.replicatedToAllIndex = replicatedToAllIndex; this.payloadVersion = payloadVersion; + this.recipientRaftVersion = recipientRaftVersion; + this.leaderRaftVersion = leaderRaftVersion; + this.leaderAddress = leaderAddress; } - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeShort(RaftVersions.CURRENT_VERSION); - out.defaultWriteObject(); - - out.writeInt(entries.size()); - for(ReplicatedLogEntry e: entries) { - out.writeObject(e); - } + public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex, + final long prevLogTerm, final @NonNull List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion, + final @Nullable String leaderAddress) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress); } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.readShort(); // raft version - - in.defaultReadObject(); - - int size = in.readInt(); - entries = new ArrayList<>(size); - for(int i = 0; i < size; i++) { - entries.add((ReplicatedLogEntry) in.readObject()); - } + @VisibleForTesting + public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex, + final long prevLogTerm, final @NonNull List entries, final long leaderCommit, + final long replicatedToAllIndex, final short payloadVersion) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + RaftVersions.CURRENT_VERSION, null); } - public String getLeaderId() { + public @NonNull String getLeaderId() { return leaderId; } @@ -101,7 +100,7 @@ public class AppendEntries extends AbstractRaftRPC { return prevLogTerm; } - public List getEntries() { + public @NonNull List getEntries() { return entries; } @@ -117,109 +116,160 @@ public class AppendEntries extends AbstractRaftRPC { return payloadVersion; } + public Optional getLeaderAddress() { + return Optional.ofNullable(leaderAddress); + } + + public short getLeaderRaftVersion() { + return leaderRaftVersion; + } + @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) - .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) - .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") - .append(payloadVersion).append(", entries=").append(entries).append("]"); - return builder.toString(); + return "AppendEntries [leaderId=" + leaderId + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", leaderCommit=" + leaderCommit + + ", replicatedToAllIndex=" + replicatedToAllIndex + + ", payloadVersion=" + payloadVersion + + ", recipientRaftVersion=" + recipientRaftVersion + + ", leaderRaftVersion=" + leaderRaftVersion + + ", leaderAddress=" + leaderAddress + + ", entries=" + entries + "]"; } - public Object toSerializable() { - return toSerializable(RaftVersions.CURRENT_VERSION); + @Override + Object writeReplace() { + return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this); } - public Object toSerializable(short version) { - if(version < RaftVersions.LITHIUM_VERSION) { - return toLegacySerializable(); - } else { - return this; - } - } + /** + * Fluorine version that adds the leader address. + */ + private static class ProxyV2 implements Externalizable { + private static final long serialVersionUID = 1L; - @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object toLegacySerializable() { - AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); - to.setTerm(this.getTerm()) - .setLeaderId(this.getLeaderId()) - .setPrevLogTerm(this.getPrevLogTerm()) - .setPrevLogIndex(this.getPrevLogIndex()) - .setLeaderCommit(this.getLeaderCommit()); + private AppendEntries appendEntries; - for (ReplicatedLogEntry logEntry : this.getEntries()) { + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public ProxyV2() { + } - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder(); + ProxyV2(final AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder(); + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeShort(appendEntries.leaderRaftVersion); + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } - //get the client specific payload extensions and add them to the payload builder - Map map = logEntry.getData().encode(); - Iterator> iter = map.entrySet().iterator(); + out.writeObject(appendEntries.leaderAddress); + } - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - arpBuilder.setExtension(entry.getKey(), entry.getValue()); + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + short leaderRaftVersion = in.readShort(); + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); } - arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName()); + String leaderAddress = (String)in.readObject(); - arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm()); - to.addLogEntries(arBuilder); + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion, + leaderAddress); } - return to.build(); + private Object readResolve() { + return appendEntries; + } } - public static AppendEntries fromSerializable(Object serialized) { - if(serialized instanceof AppendEntries) { - return (AppendEntries)serialized; + /** + * Pre-Fluorine version. + */ + @Deprecated + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { } - else { - return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized); + + Proxy(final AppendEntries appendEntries) { + this.appendEntries = appendEntries; } - } - private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) { - List logEntryList = new ArrayList<>(); - for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { - - Payload payload = null ; - try { - if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) { - String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); - payload = (Payload) Class.forName(clientPayloadClassName).newInstance(); - payload = payload.decode(leProtoBuff.getData()); - } else { - LOG.error("Payload is null or payload does not have client payload class name"); - } - - } catch (InstantiationException e) { - LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (IllegalAccessException e) { - LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (ClassNotFoundException e) { - LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e); + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); } - ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry( - leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload); - logEntryList.add(logEntry); } - AppendEntries to = new AppendEntries(from.getTerm(), - from.getLeaderId(), - from.getPrevLogIndex(), - from.getPrevLogTerm(), - logEntryList, - from.getLeaderCommit(), -1, (short)0); + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } - return to; - } + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null); + } - public static boolean isSerializedType(Object message) { - return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message); + private Object readResolve() { + return appendEntries; + } } }