Move {Identifiable,Persistent,}Payload
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index 94366efd5e897657744df37b0a475a847273e1b4..95965d560f4e02c319c2fadb56394f5ecc0a2ceb 100644 (file)
@@ -5,32 +5,32 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.messages;
 
-import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 
 /**
- * Invoked by leader to replicate log entries (§5.3); also used as
- * heartbeat (§5.2).
+ * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
  */
-public class AppendEntries extends AbstractRaftRPC {
-
-    public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
-
-    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
+public final class AppendEntries extends AbstractRaftRPC {
+    private static final long serialVersionUID = 1L;
 
     // So that follower can redirect clients
-    private final String leaderId;
+    private final @NonNull String leaderId;
 
     // Index of log entry immediately preceding new ones
     private final long prevLogIndex;
@@ -38,24 +38,57 @@ public class AppendEntries extends AbstractRaftRPC {
     // term of prevLogIndex entry
     private final long prevLogTerm;
 
-    // log entries to store (empty for heartbeat;
-    // may send more than one for efficiency)
-    private final List<ReplicatedLogEntry> entries;
+    // log entries to store (empty for heart beat - may send more than one for efficiency)
+    private final @NonNull List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
     private final long leaderCommit;
 
-    public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+    // index which has been replicated successfully to all followers, -1 if none
+    private final long replicatedToAllIndex;
+
+    private final short payloadVersion;
+
+    private final short recipientRaftVersion;
+
+    private final short leaderRaftVersion;
+
+    private final String leaderAddress;
+
+    private AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
+            final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+            final short leaderRaftVersion, @Nullable final String leaderAddress) {
         super(term);
-        this.leaderId = leaderId;
+        this.leaderId = requireNonNull(leaderId);
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
-        this.entries = entries;
+        this.entries = requireNonNull(entries);
         this.leaderCommit = leaderCommit;
+        this.replicatedToAllIndex = replicatedToAllIndex;
+        this.payloadVersion = payloadVersion;
+        this.recipientRaftVersion = recipientRaftVersion;
+        this.leaderRaftVersion = leaderRaftVersion;
+        this.leaderAddress = leaderAddress;
     }
 
-    public String getLeaderId() {
+    public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+            final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+            final @Nullable String leaderAddress) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+    }
+
+    @VisibleForTesting
+    public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+            final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                RaftVersions.CURRENT_VERSION, null);
+    }
+
+    public @NonNull String getLeaderId() {
         return leaderId;
     }
 
@@ -67,7 +100,7 @@ public class AppendEntries extends AbstractRaftRPC {
         return prevLogTerm;
     }
 
-    public List<ReplicatedLogEntry> getEntries() {
+    public @NonNull List<ReplicatedLogEntry> getEntries() {
         return entries;
     }
 
@@ -75,89 +108,168 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
-    @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder("AppendEntries{");
-        sb.append("term=").append(getTerm());
-        sb.append("leaderId='").append(leaderId).append('\'');
-        sb.append(", prevLogIndex=").append(prevLogIndex);
-        sb.append(", prevLogTerm=").append(prevLogTerm);
-        sb.append(", entries=").append(entries);
-        sb.append(", leaderCommit=").append(leaderCommit);
-        sb.append('}');
-        return sb.toString();
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
     }
 
-    public <T extends Object> Object toSerializable(){
-        AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
-        to.setTerm(this.getTerm())
-            .setLeaderId(this.getLeaderId())
-            .setPrevLogTerm(this.getPrevLogTerm())
-            .setPrevLogIndex(this.getPrevLogIndex())
-            .setLeaderCommit(this.getLeaderCommit());
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
 
-        for (ReplicatedLogEntry logEntry : this.getEntries()) {
+    public Optional<String> getLeaderAddress() {
+        return Optional.ofNullable(leaderAddress);
+    }
 
-            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
-                AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
+    public short getLeaderRaftVersion() {
+        return leaderRaftVersion;
+    }
 
-            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
-                AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
+    @Override
+    public String toString() {
+        return "AppendEntries [leaderId=" + leaderId
+                + ", prevLogIndex=" + prevLogIndex
+                + ", prevLogTerm=" + prevLogTerm
+                + ", leaderCommit=" + leaderCommit
+                + ", replicatedToAllIndex=" + replicatedToAllIndex
+                + ", payloadVersion=" + payloadVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion
+                + ", leaderRaftVersion=" + leaderRaftVersion
+                + ", leaderAddress=" + leaderAddress
+                + ", entries=" + entries + "]";
+    }
 
-            //get the client specific payload extensions and add them to the payload builder
-            Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
-            Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
+    @Override
+    Object writeReplace() {
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+    }
 
-            while (iter.hasNext()) {
-                Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
-                arpBuilder.setExtension(entry.getKey(), entry.getValue());
+    /**
+     * Fluorine version that adds the leader address.
+     */
+    private static class ProxyV2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public ProxyV2() {
+        }
+
+        ProxyV2(final AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeShort(appendEntries.leaderRaftVersion);
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for (ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(appendEntries.leaderAddress);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            short leaderRaftVersion = in.readShort();
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
             }
 
-            arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
+            String leaderAddress = (String)in.readObject();
 
-            arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
-            to.addLogEntries(arBuilder);
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+                    leaderAddress);
         }
 
-        return to.build();
+        private Object readResolve() {
+            return appendEntries;
+        }
     }
 
-    public static AppendEntries fromSerializable(Object o){
-        AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
 
-        List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
-        for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
+        private AppendEntries appendEntries;
 
-            Payload payload = null ;
-            try {
-                if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
-                    String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
-                    payload = (Payload)Class.forName(clientPayloadClassName).newInstance();
-                    payload = payload.decode(leProtoBuff.getData());
-                    payload.setClientPayloadClassName(clientPayloadClassName);
-                } else {
-                    LOG.error("Payload is null or payload does not have client payload class name");
-                }
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(final AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
 
-            } catch (InstantiationException e) {
-                LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
-            } catch (IllegalAccessException e) {
-                LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
-            } catch (ClassNotFoundException e) {
-                LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for (ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
             }
-            ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
-                leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
-            logEntryList.add(logEntry);
         }
 
-        AppendEntries to = new AppendEntries(from.getTerm(),
-            from.getLeaderId(),
-            from.getPrevLogIndex(),
-            from.getPrevLogTerm(),
-            logEntryList,
-            from.getLeaderCommit());
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
+        }
 
-        return to;
+        private Object readResolve() {
+            return appendEntries;
+        }
     }
 }