package org.opendaylight.controller.cluster.raft;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
public class SerializationUtils {
public static Object fromSerializable(Object serializable){
- if(AppendEntries.isSerializedType(serializable)){
- return AppendEntries.fromSerializable(serializable);
-
- } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+ if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
return InstallSnapshot.fromSerializable(serializable);
}
return serializable;
}
-
}
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerActor.tell(appendEntries, actor());
}
/**
package org.opendaylight.controller.cluster.raft.messages;
-import com.google.protobuf.GeneratedMessage;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* Invoked by leader to replicate log entries (ยง5.3); also used as
public class AppendEntries extends AbstractRaftRPC {
private static final long serialVersionUID = 1L;
- public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
- AppendEntriesMessages.AppendEntries.class;
-
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
// So that follower can redirect clients
.append(payloadVersion).append(", entries=").append(entries).append("]");
return builder.toString();
}
-
- public <T extends Object> Object toSerializable() {
- return toSerializable(RaftVersions.CURRENT_VERSION);
- }
-
- public <T extends Object> Object toSerializable(short version) {
- if(version < RaftVersions.LITHIUM_VERSION) {
- return toLegacySerializable();
- } else {
- return this;
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private <T> Object toLegacySerializable() {
- AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
- to.setTerm(this.getTerm())
- .setLeaderId(this.getLeaderId())
- .setPrevLogTerm(this.getPrevLogTerm())
- .setPrevLogIndex(this.getPrevLogIndex())
- .setLeaderCommit(this.getLeaderCommit());
-
- for (ReplicatedLogEntry logEntry : this.getEntries()) {
-
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
-
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
-
- //get the client specific payload extensions and add them to the payload builder
- Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
- Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
-
- while (iter.hasNext()) {
- Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
- arpBuilder.setExtension(entry.getKey(), entry.getValue());
- }
-
- arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
-
- arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
- to.addLogEntries(arBuilder);
- }
-
- return to.build();
- }
-
- public static AppendEntries fromSerializable(Object serialized) {
- if(serialized instanceof AppendEntries) {
- return (AppendEntries)serialized;
- }
- else {
- return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
- }
- }
-
- private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
- List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
- for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
-
- Payload payload = null ;
- try {
- if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
- String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
- payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
- payload = payload.decode(leProtoBuff.getData());
- } else {
- LOG.error("Payload is null or payload does not have client payload class name");
- }
-
- } catch (InstantiationException e) {
- LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
- } catch (IllegalAccessException e) {
- LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
- } catch (ClassNotFoundException e) {
- LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
- }
- ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
- leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
- logEntryList.add(logEntry);
- }
-
- AppendEntries to = new AppendEntries(from.getTerm(),
- from.getLeaderId(),
- from.getPrevLogIndex(),
- from.getPrevLogTerm(),
- logEntryList,
- from.getLeaderCommit(), -1, (short)0);
-
- return to;
- }
-
- public static boolean isSerializedType(Object message) {
- return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
- }
}
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import org.apache.commons.lang.SerializationUtils;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* Unit tests for AppendEntries.
verifyAppendEntries(expected, cloned);
}
- @Test
- public void testToAndFromSerializable() {
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
- Collections.<ReplicatedLogEntry>emptyList(), 10L, -1, (short)0);
-
- assertSame("toSerializable", entries, entries.toSerializable());
- assertSame("fromSerializable", entries,
- org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(entries));
- }
-
- @Test
- public void testToAndFromLegacySerializable() {
- ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1, (short)0);
-
- Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
- Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
-
- AppendEntries entries2 = (AppendEntries)
- org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(serializable);
-
- verifyAppendEntries(entries, entries2);
- }
-
private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual) {
assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
assertEquals("getTerm", expected.getTerm(), actual.getTerm());
package org.opendaylight.controller.cluster.datastore;
-import java.util.ArrayList;
-import java.util.List;
+import static junit.framework.Assert.assertTrue;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
@Deprecated
public class CompositeModificationByteStringPayloadTest {
- private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
-
@Test
public void testSerialization(){
WriteModification writeModification =
assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
}
-
- @Test
- public void testAppendEntries(){
- List<ReplicatedLogEntry> entries = new ArrayList<>();
-
- WriteModification writeModification = new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
-
- MutableCompositeModification compositeModification = new MutableCompositeModification();
-
- compositeModification.addModification(writeModification);
-
- CompositeModificationByteStringPayload payload =
- new CompositeModificationByteStringPayload(compositeModification.toSerializable());
-
- payload.clearModificationReference();
-
- entries.add(new ReplicatedLogImplEntry(0, 1, payload));
-
- assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1, (short)0).toSerializable());
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.RaftVersions;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-@Deprecated
-public class CompositeModificationPayloadTest {
-
- @Test
- public void testBasic() throws IOException {
-
- List<ReplicatedLogEntry> entries = new ArrayList<>();
-
- entries.add(0, new ReplicatedLogEntry() {
- @Override public Payload getData() {
- WriteModification writeModification =
- new WriteModification(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME));
-
- MutableCompositeModification compositeModification =
- new MutableCompositeModification();
-
- compositeModification.addModification(writeModification);
-
- return new CompositeModificationPayload(compositeModification.toSerializable());
- }
-
- @Override public long getTerm() {
- return 1;
- }
-
- @Override public long getIndex() {
- return 1;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
- });
-
- AppendEntries appendEntries =
- new AppendEntries(1, "member-1", 0, 100, entries, 1, -1, (short)0);
-
- AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
- appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- o.writeDelimitedTo(bos);
-
- AppendEntriesMessages.AppendEntries appendEntries2 =
- AppendEntriesMessages.AppendEntries
- .parseDelimitedFrom(new ByteArrayInputStream(bos.toByteArray()));
-
- AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
-
- Payload data = appendEntries1.getEntries().get(0).getData();
-
-
- Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.programs.appendentries;
-
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.List;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.example.messages.KeyValue;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-public class Client {
-
- private static ActorSystem actorSystem;
-
- public static class ClientActor extends UntypedActor {
-
- @Override public void onReceive(Object message) throws Exception {
-
- }
- }
-
- public static void main(String[] args){
- actorSystem = ActorSystem.create("appendentries", ConfigFactory
- .load().getConfig("ODLCluster"));
-
- ActorSelection actorSelection = actorSystem.actorSelection(
- "akka.tcp://appendentries@127.0.0.1:2550/user/server");
-
- AppendEntries appendEntries = modificationAppendEntries();
-
- Payload data = appendEntries.getEntries().get(0).getData();
- if(data instanceof CompositeModificationPayload) {
- System.out.println(
- "Sending : " + ((CompositeModificationPayload) data)
- .getModification());
- } else {
- System.out.println(
- "Sending : " + ((KeyValue) data)
- .getKey());
-
- }
-
- actorSelection.tell(appendEntries.toSerializable(), null);
-
-
-
-
- actorSystem.actorOf(Props.create(ClientActor.class), "client");
- }
-
- public static AppendEntries modificationAppendEntries() {
- List<ReplicatedLogEntry> modification = new ArrayList<>();
-
- modification.add(0, new ReplicatedLogEntry() {
- @Override public Payload getData() {
- WriteModification writeModification =
- new WriteModification(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME));
-
- MutableCompositeModification compositeModification =
- new MutableCompositeModification();
-
- compositeModification.addModification(writeModification);
-
- return new CompositeModificationByteStringPayload(
- compositeModification.toSerializable());
- }
-
- @Override public long getTerm() {
- return 1;
- }
-
- @Override public long getIndex() {
- return 1;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
- });
-
- return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
- }
-
- public static AppendEntries keyValueAppendEntries() {
- List<ReplicatedLogEntry> modification = new ArrayList<>();
-
- modification.add(0, new ReplicatedLogEntry() {
- @Override public Payload getData() {
- return new KeyValue("moiz", "test");
- }
-
- @Override public long getTerm() {
- return 1;
- }
-
- @Override public long getIndex() {
- return 1;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
- });
-
- return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.programs.appendentries;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.example.messages.KeyValue;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
-public class Server {
-
- private static ActorSystem actorSystem;
-
- public static class ServerActor extends UntypedActor {
-
- @Override public void onReceive(Object message) throws Exception {
- if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(message.getClass())){
- AppendEntries appendEntries =
- AppendEntries.fromSerializable(message);
-
- Payload data = appendEntries.getEntries()
- .get(0).getData();
- if(data instanceof KeyValue){
- System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey());
- } else {
- System.out.println("Received :" +
- ((CompositeModificationPayload) appendEntries
- .getEntries()
- .get(0).getData()).getModification().toString());
- }
- } else if(message instanceof String){
- System.out.println(message);
- }
- }
- }
-
- public static void main(String[] args){
- actorSystem = ActorSystem.create("appendentries", ConfigFactory
- .load().getConfig("ODLCluster"));
-
- actorSystem.actorOf(Props.create(ServerActor.class), "server");
- }
-}
if(o instanceof RequestVote){
RequestVote req = (RequestVote) o;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
- } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
- AppendEntries req = AppendEntries.fromSerializable(o);
- handleAppendEntries(req);
+ } else if(o instanceof AppendEntries) {
+ handleAppendEntries((AppendEntries)o);
} else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
InstallSnapshot req = InstallSnapshot.fromSerializable(o);
handleInstallSnapshot(req);