Merge "Do not override jsr305 version"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index 6665d7549b0d82abe752bcd50aed1826455b331e..d2ea0c50cd6203fd6b142793612e4e0546000185 100644 (file)
@@ -9,23 +9,28 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 /**
  * Invoked by leader to replicate log entries (§5.3); also used as
  * heartbeat (§5.2).
  */
 public class AppendEntries extends AbstractRaftRPC {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
+    public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
+            AppendEntriesMessages.AppendEntries.class;
 
     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
 
@@ -40,19 +45,45 @@ public class AppendEntries extends AbstractRaftRPC {
 
     // log entries to store (empty for heartbeat;
     // may send more than one for efficiency)
-    private final List<ReplicatedLogEntry> entries;
+    private transient List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
     private final long leaderCommit;
 
+    // index which has been replicated successfully to all followers, -1 if none
+    private final long replicatedToAllIndex;
+
     public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
         this.entries = entries;
         this.leaderCommit = leaderCommit;
+        this.replicatedToAllIndex = replicatedToAllIndex;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.writeShort(RaftVersions.CURRENT_VERSION);
+        out.defaultWriteObject();
+
+        out.writeInt(entries.size());
+        for(ReplicatedLogEntry e: entries) {
+            out.writeObject(e);
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.readShort(); // version
+
+        in.defaultReadObject();
+
+        int size = in.readInt();
+        entries = new ArrayList<>(size);
+        for(int i = 0; i < size; i++) {
+            entries.add((ReplicatedLogEntry) in.readObject());
+        }
     }
 
     public String getLeaderId() {
@@ -75,20 +106,35 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
-    @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder("AppendEntries{");
-        sb.append("term=").append(getTerm());
-        sb.append("leaderId='").append(leaderId).append('\'');
-        sb.append(", prevLogIndex=").append(prevLogIndex);
-        sb.append(", prevLogTerm=").append(prevLogTerm);
-        sb.append(", entries=").append(entries);
-        sb.append(", leaderCommit=").append(leaderCommit);
-        sb.append('}');
-        return sb.toString();
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
+                .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
+                .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
+                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
+        return builder.toString();
+    }
+
+    public <T extends Object> Object toSerializable() {
+        return toSerializable(RaftVersions.CURRENT_VERSION);
     }
 
-    public <T extends Object> Object toSerializable(){
+    public <T extends Object> Object toSerializable(short version) {
+        if(version < RaftVersions.LITHIUM_VERSION) {
+            return toLegacySerializable();
+        } else {
+            return this;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private <T> Object toLegacySerializable() {
         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
         to.setTerm(this.getTerm())
             .setLeaderId(this.getLeaderId())
@@ -122,9 +168,16 @@ public class AppendEntries extends AbstractRaftRPC {
         return to.build();
     }
 
-    public static AppendEntries fromSerializable(Object o){
-        AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+    public static AppendEntries fromSerializable(Object serialized) {
+        if(serialized instanceof AppendEntries) {
+            return (AppendEntries)serialized;
+        }
+        else {
+            return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
+        }
+    }
 
+    private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
 
@@ -132,9 +185,8 @@ public class AppendEntries extends AbstractRaftRPC {
             try {
                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
-                    payload = (Payload)Class.forName(clientPayloadClassName).newInstance();
+                    payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
                     payload = payload.decode(leProtoBuff.getData());
-                    payload.setClientPayloadClassName(clientPayloadClassName);
                 } else {
                     LOG.error("Payload is null or payload does not have client payload class name");
                 }
@@ -156,8 +208,12 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit());
+            from.getLeaderCommit(), -1);
 
         return to;
     }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
+    }
 }