package org.opendaylight.controller.cluster.raft.messages;
-import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
/**
* Invoked by leader to replicate log entries (§5.3); also used as
* heartbeat (§5.2).
*/
public class AppendEntries extends AbstractRaftRPC {
- public static final Class<AppendEntriesMessages.AppendEntries> SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
-
- private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
private static final long serialVersionUID = 1L;
// So that follower can redirect clients
// term of prevLogIndex entry
private final long prevLogTerm;
- // log entries to store (empty for heartbeat;
- // may send more than one for efficiency)
+ // log entries to store (empty for heart beat - may send more than one for efficiency)
private final List<ReplicatedLogEntry> entries;
// leader's commitIndex
private final long leaderCommit;
- public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ // index which has been replicated successfully to all followers, -1 if none
+ private final long replicatedToAllIndex;
+
+ private final short payloadVersion;
+
+ public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
+ List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
+ this.replicatedToAllIndex = replicatedToAllIndex;
+ this.payloadVersion = payloadVersion;
}
public String getLeaderId() {
return leaderCommit;
}
- @Override public String toString() {
- final StringBuilder sb =
- new StringBuilder("AppendEntries{");
- sb.append("term=").append(getTerm());
- sb.append("leaderId='").append(leaderId).append('\'');
- sb.append(", prevLogIndex=").append(prevLogIndex);
- sb.append(", prevLogTerm=").append(prevLogTerm);
- sb.append(", entries=").append(entries);
- sb.append(", leaderCommit=").append(leaderCommit);
- sb.append('}');
- return sb.toString();
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
}
- public <T extends Object> Object toSerializable(){
- AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
- to.setTerm(this.getTerm())
- .setLeaderId(this.getLeaderId())
- .setPrevLogTerm(this.getPrevLogTerm())
- .setPrevLogIndex(this.getPrevLogIndex())
- .setLeaderCommit(this.getLeaderCommit());
-
- for (ReplicatedLogEntry logEntry : this.getEntries()) {
-
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
+ .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
+ .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
+ .append(payloadVersion).append(", entries=").append(entries).append("]");
+ return builder.toString();
+ }
- //get the client specific payload extensions and add them to the payload builder
- Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
- Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
- while (iter.hasNext()) {
- Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
- arpBuilder.setExtension(entry.getKey(), entry.getValue());
- }
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
- arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
+ private AppendEntries appendEntries;
- arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
- to.addLogEntries(arBuilder);
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
}
- return to.build();
- }
+ Proxy(AppendEntries appendEntries) {
+ this.appendEntries = appendEntries;
+ }
- public static AppendEntries fromSerializable(Object o){
- AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
-
- List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
- for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
-
- Payload payload = null ;
- try {
- if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
- String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
- payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
- payload = payload.decode(leProtoBuff.getData());
- payload.setClientPayloadClassName(clientPayloadClassName);
- } else {
- LOG.error("Payload is null or payload does not have client payload class name");
- }
-
- } catch (InstantiationException e) {
- LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
- } catch (IllegalAccessException e) {
- LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
- } catch (ClassNotFoundException e) {
- LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(appendEntries.getTerm());
+ out.writeObject(appendEntries.leaderId);
+ out.writeLong(appendEntries.prevLogTerm);
+ out.writeLong(appendEntries.prevLogIndex);
+ out.writeLong(appendEntries.leaderCommit);
+ out.writeLong(appendEntries.replicatedToAllIndex);
+ out.writeShort(appendEntries.payloadVersion);
+
+ out.writeInt(appendEntries.entries.size());
+ for (ReplicatedLogEntry e: appendEntries.entries) {
+ out.writeLong(e.getIndex());
+ out.writeLong(e.getTerm());
+ out.writeObject(e.getData());
}
- ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
- leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
- logEntryList.add(logEntry);
}
- AppendEntries to = new AppendEntries(from.getTerm(),
- from.getLeaderId(),
- from.getPrevLogIndex(),
- from.getPrevLogTerm(),
- logEntryList,
- from.getLeaderCommit());
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ long term = in.readLong();
+ String leaderId = (String) in.readObject();
+ long prevLogTerm = in.readLong();
+ long prevLogIndex = in.readLong();
+ long leaderCommit = in.readLong();
+ long replicatedToAllIndex = in.readLong();
+ short payloadVersion = in.readShort();
+
+ int size = in.readInt();
+ List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+ }
- return to;
+ appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+ replicatedToAllIndex, payloadVersion);
+ }
+
+ private Object readResolve() {
+ return appendEntries;
+ }
}
}