X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=896f3048bc83909512ef7d1493404c7352c00ab1;hb=c506fa90f99febfdf25a21bdd0b8359d7e2f3ab7;hp=97bcd6a708b7c8f11646b026e7e1653a2713ff40;hpb=fc54ab8853d36fb1d7aebf2a09ef10567e66aa0d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 97bcd6a708..896f3048bc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,19 +8,13 @@ package org.opendaylight.controller.cluster.raft.messages; -import com.google.protobuf.GeneratedMessage; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Map; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; /** * Invoked by leader to replicate log entries (§5.3); also used as @@ -29,9 +23,6 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries public class AppendEntries extends AbstractRaftRPC { private static final long serialVersionUID = 1L; - public static final Class LEGACY_SERIALIZABLE_CLASS = - AppendEntriesMessages.AppendEntries.class; - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); // So that follower can redirect clients @@ -53,8 +44,10 @@ public class AppendEntries extends AbstractRaftRPC { // index which has been replicated successfully to all followers, -1 if none private final long replicatedToAllIndex; - public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit, long replicatedToAllIndex) { + private final short payloadVersion; + + public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm, + List entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; @@ -62,6 +55,7 @@ public class AppendEntries extends AbstractRaftRPC { this.entries = entries; this.leaderCommit = leaderCommit; this.replicatedToAllIndex = replicatedToAllIndex; + this.payloadVersion = payloadVersion; } private void writeObject(ObjectOutputStream out) throws IOException { @@ -75,7 +69,7 @@ public class AppendEntries extends AbstractRaftRPC { } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.readShort(); // version + in.readShort(); // raft version in.defaultReadObject(); @@ -110,114 +104,17 @@ public class AppendEntries extends AbstractRaftRPC { return replicatedToAllIndex; } - @Override - public String toString() { - final StringBuilder sb = - new StringBuilder("AppendEntries{"); - sb.append("term=").append(getTerm()); - sb.append("leaderId='").append(leaderId).append('\''); - sb.append(", prevLogIndex=").append(prevLogIndex); - sb.append(", prevLogTerm=").append(prevLogTerm); - sb.append(", entries=").append(entries); - sb.append(", leaderCommit=").append(leaderCommit); - sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex); - sb.append('}'); - return sb.toString(); - } - - public Object toSerializable() { - return toSerializable(RaftVersions.CURRENT_VERSION); - } - - public Object toSerializable(short version) { - if(version < RaftVersions.LITHIUM_VERSION) { - return toLegacySerializable(); - } else { - return this; - } + public short getPayloadVersion() { + return payloadVersion; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object toLegacySerializable() { - AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); - to.setTerm(this.getTerm()) - .setLeaderId(this.getLeaderId()) - .setPrevLogTerm(this.getPrevLogTerm()) - .setPrevLogIndex(this.getPrevLogIndex()) - .setLeaderCommit(this.getLeaderCommit()); - - for (ReplicatedLogEntry logEntry : this.getEntries()) { - - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder(); - - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder = - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder(); - - //get the client specific payload extensions and add them to the payload builder - Map map = logEntry.getData().encode(); - Iterator> iter = map.entrySet().iterator(); - - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - arpBuilder.setExtension(entry.getKey(), entry.getValue()); - } - - arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName()); - - arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm()); - to.addLogEntries(arBuilder); - } - - return to.build(); - } - - public static AppendEntries fromSerializable(Object serialized) { - if(serialized instanceof AppendEntries) { - return (AppendEntries)serialized; - } - else { - return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized); - } - } - - private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) { - List logEntryList = new ArrayList<>(); - for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { - - Payload payload = null ; - try { - if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) { - String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); - payload = (Payload) Class.forName(clientPayloadClassName).newInstance(); - payload = payload.decode(leProtoBuff.getData()); - } else { - LOG.error("Payload is null or payload does not have client payload class name"); - } - - } catch (InstantiationException e) { - LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (IllegalAccessException e) { - LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e); - } catch (ClassNotFoundException e) { - LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e); - } - ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry( - leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload); - logEntryList.add(logEntry); - } - - AppendEntries to = new AppendEntries(from.getTerm(), - from.getLeaderId(), - from.getPrevLogIndex(), - from.getPrevLogTerm(), - logEntryList, - from.getLeaderCommit(), -1); - - return to; - } - - public static boolean isSerializedType(Object message) { - return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message); + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) + .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) + .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") + .append(payloadVersion).append(", entries=").append(entries).append("]"); + return builder.toString(); } }