X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=45ac9a9ebfbecbb2fb13881dd3f365f8ac9f7e8e;hp=fd4e0a274c6f9122e40809e7b120f6a7a8b936bd;hb=ed693440aa741fee9b94447f8404d89b4020f616;hpb=583f6075e842a6a37b83bd01e478aebc70c6af73 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index fd4e0a274c..45ac9a9ebf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,13 +8,28 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.protobuf.GeneratedMessage; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). */ -public class AppendEntries extends AbstractRaftRPC { +public class AppendEntries extends AbstractRaftRPC implements Serializable { + + public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class; + + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); + // So that follower can redirect clients private final String leaderId; @@ -26,13 +41,13 @@ public class AppendEntries extends AbstractRaftRPC { // log entries to store (empty for heartbeat; // may send more than one for efficiency) - private final List entries; + private final List entries; // leader's commitIndex private final long leaderCommit; public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit) { + long prevLogTerm, List entries, long leaderCommit) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; @@ -53,11 +68,94 @@ public class AppendEntries extends AbstractRaftRPC { return prevLogTerm; } - public List getEntries() { + public List getEntries() { return entries; } public long getLeaderCommit() { return leaderCommit; } + + @Override public String toString() { + return "AppendEntries{" + + "leaderId='" + leaderId + '\'' + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", entries=" + entries + + ", leaderCommit=" + leaderCommit + + '}'; + } + + public Object toSerializable(){ + AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); + to.setTerm(this.getTerm()) + .setLeaderId(this.getLeaderId()) + .setPrevLogTerm(this.getPrevLogTerm()) + .setPrevLogIndex(this.getPrevLogIndex()) + .setLeaderCommit(this.getLeaderCommit()); + + for (ReplicatedLogEntry logEntry : this.getEntries()) { + + AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder = + AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder(); + + AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder = + AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder(); + + //get the client specific payload extensions and add them to the payload builder + Map map = logEntry.getData().encode(); + Iterator> iter = map.entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + arpBuilder.setExtension(entry.getKey(), entry.getValue()); + } + + arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName()); + + arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm()); + to.addLogEntries(arBuilder); + } + + return to.build(); + } + + public static AppendEntries fromSerializable(Object o){ + AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o; + + List logEntryList = new ArrayList<>(); + for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { + + Payload payload = null ; + try { + if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) { + String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); + payload = (Payload)Class.forName(clientPayloadClassName).newInstance(); + payload = payload.decode(leProtoBuff.getData()); + payload.setClientPayloadClassName(clientPayloadClassName); + } else { + LOG.error("Payload is null or payload does not have client payload class name"); + } + + } catch (InstantiationException e) { + LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e); + } catch (IllegalAccessException e) { + LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e); + } catch (ClassNotFoundException e) { + LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e); + } + ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry( + leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload); + logEntryList.add(logEntry); + } + + AppendEntries to = new AppendEntries(from.getTerm(), + from.getLeaderId(), + from.getPrevLogIndex(), + from.getPrevLogTerm(), + logEntryList, + from.getLeaderCommit()); + + return to; + } }