X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=dc1ebf0730fd49bd7b3b73ba3c54e4c5127bc02c;hp=896f3048bc83909512ef7d1493404c7352c00ab1;hb=baa92613648863a7d839be36f89cc30431d5a66a;hpb=a1a1447f1cad31eb9efcb938e0e626b8d229f0d9 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 896f3048bc..dc1ebf0730 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,13 +8,15 @@ package org.opendaylight.controller.cluster.raft.messages; +import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; -import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** * Invoked by leader to replicate log entries (§5.3); also used as @@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; public class AppendEntries extends AbstractRaftRPC { private static final long serialVersionUID = 1L; - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); - // So that follower can redirect clients private final String leaderId; @@ -58,28 +58,6 @@ public class AppendEntries extends AbstractRaftRPC { this.payloadVersion = payloadVersion; } - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeShort(RaftVersions.CURRENT_VERSION); - out.defaultWriteObject(); - - out.writeInt(entries.size()); - for(ReplicatedLogEntry e: entries) { - out.writeObject(e); - } - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.readShort(); // raft version - - in.defaultReadObject(); - - int size = in.readInt(); - entries = new ArrayList<>(size); - for(int i = 0; i < size; i++) { - entries.add((ReplicatedLogEntry) in.readObject()); - } - } - public String getLeaderId() { return leaderId; } @@ -117,4 +95,63 @@ public class AppendEntries extends AbstractRaftRPC { .append(payloadVersion).append(", entries=").append(entries).append("]"); return builder.toString(); } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + public Proxy() { + } + + Proxy(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for(ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for(int i = 0; i < size; i++) { + entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } + + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion); + } + + private Object readResolve() { + return appendEntries; + } + } }