X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=7ebc857444b1d2fae58cd2ce1b36337646b4068a;hp=94366efd5e897657744df37b0a475a847273e1b4;hb=97542f208267cb5392fc8c8d9baf6c1d3ee4ae32;hpb=4a8d4efda0828bc0c147dee3644c51baa6ff5a15 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 94366efd5e..7ebc857444 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,26 +8,22 @@ package org.opendaylight.controller.cluster.raft.messages; -import com.google.protobuf.GeneratedMessage; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; /** * Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). */ public class AppendEntries extends AbstractRaftRPC { - - public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class; - - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); + private static final long serialVersionUID = 1L; // So that follower can redirect clients private final String leaderId; @@ -38,21 +34,27 @@ public class AppendEntries extends AbstractRaftRPC { // term of prevLogIndex entry private final long prevLogTerm; - // log entries to store (empty for heartbeat; - // may send more than one for efficiency) + // log entries to store (empty for heart beat - may send more than one for efficiency) private final List entries; // leader's commitIndex private final long leaderCommit; - public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit) { + // index which has been replicated successfully to all followers, -1 if none + private final long replicatedToAllIndex; + + private final short payloadVersion; + + public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm, + List entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; this.entries = entries; this.leaderCommit = leaderCommit; + this.replicatedToAllIndex = replicatedToAllIndex; + this.payloadVersion = payloadVersion; } public String getLeaderId() { @@ -75,89 +77,83 @@ public class AppendEntries extends AbstractRaftRPC { return leaderCommit; } - @Override public String toString() { - final StringBuilder sb = - new StringBuilder("AppendEntries{"); - sb.append("term=").append(getTerm()); - sb.append("leaderId='").append(leaderId).append('\''); - sb.append(", prevLogIndex=").append(prevLogIndex); - sb.append(", prevLogTerm=").append(prevLogTerm); - sb.append(", entries=").append(entries); - sb.append(", leaderCommit=").append(leaderCommit); - sb.append('}'); - return sb.toString(); + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } - public Object toSerializable(){ - AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); - to.setTerm(this.getTerm()) - .setLeaderId(this.getLeaderId()) - .setPrevLogTerm(this.getPrevLogTerm()) - .setPrevLogIndex(this.getPrevLogIndex()) - .setLeaderCommit(this.getLeaderCommit()); - - for (ReplicatedLogEntry logEntry : this.getEntries()) { - - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder(); + public short getPayloadVersion() { + return payloadVersion; + } - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder(); + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) + .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) + .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") + .append(payloadVersion).append(", entries=").append(entries).append("]"); + return builder.toString(); + } - //get the client specific payload extensions and add them to the payload builder - Map map = logEntry.getData().encode(); - Iterator> iter = map.entrySet().iterator(); + private Object writeReplace() { + return new Proxy(this); + } - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - arpBuilder.setExtension(entry.getKey(), entry.getValue()); - } + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; - arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName()); + private AppendEntries appendEntries; - arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm()); - to.addLogEntries(arBuilder); + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { } - return to.build(); - } + Proxy(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } - public static AppendEntries fromSerializable(Object o){ - AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o; - - List logEntryList = new ArrayList<>(); - for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { - - Payload payload = null ; - try { - if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) { - String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); - payload = (Payload)Class.forName(clientPayloadClassName).newInstance(); - payload = payload.decode(leProtoBuff.getData()); - payload.setClientPayloadClassName(clientPayloadClassName); - } else { - LOG.error("Payload is null or payload does not have client payload class name"); - } - - } catch (InstantiationException e) { - LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (IllegalAccessException e) { - LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (ClassNotFoundException e) { - LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e); + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); } - ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry( - leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload); - logEntryList.add(logEntry); } - AppendEntries to = new AppendEntries(from.getTerm(), - from.getLeaderId(), - from.getPrevLogIndex(), - from.getPrevLogTerm(), - logEntryList, - from.getLeaderCommit()); + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } - return to; + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion); + } + + private Object readResolve() { + return appendEntries; + } } }