--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * @author Thomas Pantelis
+ */
+public interface RaftVersions {
+ short HELIUM_VERSION = 0;
+ short LITHIUM_VERSION = 1;
+ short CURRENT_VERSION = LITHIUM_VERSION;
+}
import java.io.Serializable;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
- Serializable {
- private static final long serialVersionUID = 1L;
+public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
+ private static final long serialVersionUID = -9085798014576489130L;
private final long index;
private final long term;
this.payload = payload;
}
- @Override public Payload getData() {
+ @Override
+ public Payload getData() {
return payload;
}
- @Override public long getTerm() {
+ @Override
+ public long getTerm() {
return term;
}
- @Override public long getIndex() {
+ @Override
+ public long getIndex() {
return index;
}
return getData().size();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "Entry{" +
"index=" + index +
", term=" + term +
public class SerializationUtils {
public static Object fromSerializable(Object serializable){
- if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
+ if(AppendEntries.isSerializedType(serializable)){
return AppendEntries.fromSerializable(serializable);
} else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
package org.opendaylight.controller.cluster.raft.messages;
public class AbstractRaftRPC implements RaftRPC {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -6061342433962854822L;
+
// term
protected long term;
public AbstractRaftRPC() {
}
+ @Override
public long getTerm() {
return term;
}
package org.opendaylight.controller.cluster.raft.messages;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* Invoked by leader to replicate log entries (§5.3); also used as
* heartbeat (§5.2).
*/
public class AppendEntries extends AbstractRaftRPC {
- public static final Class<AppendEntriesMessages.AppendEntries> SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
+ private static final long serialVersionUID = 1L;
+
+ public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
+ AppendEntriesMessages.AppendEntries.class;
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
- private static final long serialVersionUID = 1L;
// So that follower can redirect clients
private final String leaderId;
// log entries to store (empty for heartbeat;
// may send more than one for efficiency)
- private final List<ReplicatedLogEntry> entries;
+ private transient List<ReplicatedLogEntry> entries;
// leader's commitIndex
private final long leaderCommit;
this.leaderCommit = leaderCommit;
}
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeShort(RaftVersions.CURRENT_VERSION);
+ out.defaultWriteObject();
+
+ out.writeInt(entries.size());
+ for(ReplicatedLogEntry e: entries) {
+ out.writeObject(e);
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.readShort(); // version
+
+ in.defaultReadObject();
+
+ int size = in.readInt();
+ entries = new ArrayList<>(size);
+ for(int i = 0; i < size; i++) {
+ entries.add((ReplicatedLogEntry) in.readObject());
+ }
+ }
+
public String getLeaderId() {
return leaderId;
}
return leaderCommit;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
final StringBuilder sb =
new StringBuilder("AppendEntries{");
sb.append("term=").append(getTerm());
return sb.toString();
}
- public <T extends Object> Object toSerializable(){
+ public <T extends Object> Object toSerializable() {
+ return toSerializable(RaftVersions.CURRENT_VERSION);
+ }
+
+ public <T extends Object> Object toSerializable(short version) {
+ if(version < RaftVersions.LITHIUM_VERSION) {
+ return toLegacySerializable();
+ } else {
+ return this;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private <T> Object toLegacySerializable() {
AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
to.setTerm(this.getTerm())
.setLeaderId(this.getLeaderId())
return to.build();
}
- public static AppendEntries fromSerializable(Object o){
- AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+ public static AppendEntries fromSerializable(Object serialized) {
+ if(serialized instanceof AppendEntries) {
+ return (AppendEntries)serialized;
+ }
+ else {
+ return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
+ }
+ }
+ private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
payload = payload.decode(leProtoBuff.getData());
- payload.setClientPayloadClassName(clientPayloadClassName);
} else {
LOG.error("Payload is null or payload does not have client payload class name");
}
return to;
}
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
+ }
}
private long currentTerm = 0;
private String votedFor = "";
+ @Override
public long getCurrentTerm() {
return currentTerm;
}
+ @Override
public String getVotedFor() {
return votedFor;
}
+ @Override
public void update(long currentTerm, String votedFor){
this.currentTerm = currentTerm;
this.votedFor = votedFor;
return lastApplied;
}
+ @Override
public void setReplicatedLog(ReplicatedLog replicatedLog) {
this.replicatedLog = replicatedLog;
}
}
public static class MockPayload extends Payload implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3121380393130864247L;
private String value = "";
public MockPayload(){
return MockPayload.class.getName();
}
+ @Override
public String toString() {
return value;
}
}
public static class MockReplicatedLogBuilder {
- private ReplicatedLog mockLog = new SimpleReplicatedLog();
+ private final ReplicatedLog mockLog = new SimpleReplicatedLog();
public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
for (int i=start; i<end; i++) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for ReplicatedLogImplEntry.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicatedLogImplEntryTest {
+
+ @Test
+ public void testBackwardsCompatibleDeserializationFromHelium() throws Exception {
+ String expPayloadData = "This is a test";
+ int expIndex = 1;
+ int expTerm = 2;
+
+ try(FileInputStream fis = new FileInputStream("src/test/resources/helium-serialized-ReplicatedLogImplEntry")) {
+ ObjectInputStream ois = new ObjectInputStream(fis);
+
+ ReplicatedLogImplEntry entry = (ReplicatedLogImplEntry) ois.readObject();
+ ois.close();
+
+ Assert.assertEquals("getIndex", expIndex, entry.getIndex());
+ Assert.assertEquals("getTerm", expTerm, entry.getTerm());
+
+ MockRaftActorContext.MockPayload payload = (MockRaftActorContext.MockPayload) entry.getData();
+ Assert.assertEquals("data", expPayloadData, payload.toString());
+ }
+ }
+
+ /**
+ * Use this method to generate a file with a serialized ReplicatedLogImplEntry instance to be
+ * used in tests that verify backwards compatible de-serialization.
+ */
+ private void generateSerializedFile() throws IOException {
+ String expPayloadData = "This is a test";
+ int expIndex = 1;
+ int expTerm = 2;
+
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(expIndex, expTerm,
+ new MockRaftActorContext.MockPayload(expPayloadData));
+ FileOutputStream fos = new FileOutputStream("src/test/resources/serialized-ReplicatedLogImplEntry");
+ ObjectOutputStream oos = new ObjectOutputStream(fos);
+ oos.writeObject(entry);
+ fos.close();
+ }
+}
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
- private ActorRef leaderActor =
+ private final ActorRef leaderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
- private ActorRef senderActor =
+ private final ActorRef senderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
@Test
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
- followerActor, AppendEntries.SERIALIZABLE_CLASS);
+ AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+ followerActor, AppendEntries.class);
assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
"received", aeproto);
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
assertEquals(0, appendEntries.getPrevLogIndex());
AppendEntriesReply appendEntriesReply =
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
assertEquals(0, appendEntries.getPrevLogIndex());
AppendEntriesReply appendEntriesReply =
private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
- private long electionTimeOutIntervalMillis;
- private int snapshotChunkSize;
+ private final long electionTimeOutIntervalMillis;
+ private final int snapshotChunkSize;
public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
super();
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+
+/**
+ * Unit tests for AppendEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class AppendEntriesTest {
+
+ @Test
+ public void testSerialization() {
+ ReplicatedLogEntry entry1 = new ReplicatedLogImplEntry(1, 2, new MockPayload("payload1"));
+
+ ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
+
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+
+ AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+ verifyAppendEntries(expected, cloned);
+ }
+
+ @Test
+ public void testToAndFromSerializable() {
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
+ Collections.<ReplicatedLogEntry>emptyList(), 10L);
+
+ assertSame("toSerializable", entries, entries.toSerializable());
+ assertSame("fromSerializable", entries,
+ org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(entries));
+ }
+
+ @Test
+ public void testToAndFromLegacySerializable() {
+ ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+
+ Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
+ Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
+
+ AppendEntries entries2 = (AppendEntries)
+ org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(serializable);
+
+ verifyAppendEntries(entries, entries2);
+ }
+
+ private void verifyAppendEntries(AppendEntries expected, AppendEntries actual) {
+ assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
+ assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+ assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
+ assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+
+ assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
+ Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
+ for(ReplicatedLogEntry e: actual.getEntries()) {
+ verifyReplicatedLogEntry(iter.next(), e);
+ }
+ }
+
+ private void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) {
+ assertEquals("getIndex", expected.getIndex(), actual.getIndex());
+ assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+ assertEquals("getData", expected.getData().toString(), actual.getData().toString());
+ }
+}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
AppendEntries appendEntries =
new AppendEntries(1, "member-1", 0, 100, entries, 1);
- AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable();
+ AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
+ appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
public static class ServerActor extends UntypedActor {
@Override public void onReceive(Object message) throws Exception {
- if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){
+ if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(message.getClass())){
AppendEntries appendEntries =
AppendEntries.fromSerializable(message);