- First the data that was being sent from the primary was not being deserialized on the followers
- The state on the followers was not being applied correctly
- Add sal-distributed-datastore to the distribution
Added a test for CompositeModificationPayload to ensure that serialization works correctly
Also added a set of integration test programs to test the serialization
Change-Id: Id67d2c4fe471003d9dd3b42a8376c90fd23492ce
Signed-off-by: Moiz Raja <moraja@cisco.com>
<artifactId>sal-test-model</artifactId>
<version>${mdsal.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
<!-- SAL Extension bundles -->
<dependency>
<artifactId>jeromq</artifactId>
<version>0.3.1</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ </dependency>
+
</dependencies>
</profile>
<profile>
--- /dev/null
+ODLCluster{
+ akka {
+ actor {
+ serialize-messages = on
+
+ provider = "akka.cluster.ClusterActorRefProvider"
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+ "com.google.protobuf.FieldSet" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2552
+ maximum-frame-size = 2097152
+ send-buffer-size = 52428800
+ receive-buffer-size = 52428800
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
--- /dev/null
+
+ODLCluster{
+ akka {
+ actor {
+ serialize-messages = on
+
+ provider = "akka.cluster.ClusterActorRefProvider"
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+ "com.google.protobuf.FieldSet" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2550
+ maximum-frame-size = 2097152
+ send-buffer-size = 52428800
+ receive-buffer-size = 52428800
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
PersistentMessages.CompositeModification modification = payload
.getExtension(
org.opendaylight.controller.mdsal.CompositeModificationPayload.modification);
- payload.getExtension(KeyValueMessages.value);
+
+
+
+ // The extension was put in the unknown field.
+ // This is because extensions need to be registered
+ // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+ // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+ // If that is not done then on the other end the extension shows up as an unknown field
+ // Need to figure out a better way to do this
+ if(payload.getUnknownFields().hasField(2)){
+ UnknownFieldSet.Field field =
+ payload.getUnknownFields().getField(2);
+
+ try {
+ modification =
+ PersistentMessages.CompositeModification
+ .parseFrom(field.getLengthDelimitedList().get(0));
+ } catch (InvalidProtocolBufferException e) {
+
+ }
+ }
+
return new CompositeModificationPayload(modification);
}
modificationToCohort.remove(serialized);
if (cohort == null) {
LOG.error(
- "Could not find cohort for modification : " + modification);
+ "Could not find cohort for modification : {}", modification);
LOG.info("Writing modification using a new transaction");
- modification.apply(store.newReadWriteTransaction());
- return;
+ DOMStoreReadWriteTransaction transaction =
+ store.newReadWriteTransaction();
+ modification.apply(transaction);
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ ListenableFuture<Void> future =
+ commitCohort.preCommit();
+ try {
+ future.get();
+ future = commitCohort.commit();
+ future.get();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to commit", e);
+ } catch (ExecutionException e) {
+ LOG.error("Failed to commit", e);
+ }
}
final ListenableFuture<Void> future = cohort.commit();
if(data instanceof CompositeModificationPayload){
Object modification =
((CompositeModificationPayload) data).getModification();
- commit(clientActor, modification);
+
+ if(modification != null){
+ commit(clientActor, modification);
+ } else {
+ LOG.error("modification is null - this is very unexpected");
+ }
+
+
} else {
LOG.error("Unknown state received {}", data);
}
remoteTransactionPaths.put(shardName, transactionContext);
}
} catch(TimeoutException e){
- LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
+ LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
}
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompositeModificationPayloadTest {
+
+
+ private static final String SERIALIZE_OUT = "serialize.out";
+
+ @After
+ public void shutDown(){
+ File f = new File(SERIALIZE_OUT);
+ if(f.exists()){
+ f.delete();
+ }
+ }
+
+ @Test
+ public void testBasic() throws IOException {
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+ entries.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext());
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ return new CompositeModificationPayload(compositeModification.toSerializable());
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "member-1", 0, 100, entries, 1);
+
+ AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable();
+
+ o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+
+ AppendEntriesMessages.AppendEntries appendEntries2 =
+ AppendEntriesMessages.AppendEntries
+ .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+
+ AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
+
+ Payload data = appendEntries1.getEntries().get(0).getData();
+
+
+ Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Client {
+
+ private static ActorSystem actorSystem;
+
+ public static class ClientActor extends UntypedActor {
+
+ @Override public void onReceive(Object message) throws Exception {
+
+ }
+ }
+
+ public static void main(String[] args){
+ actorSystem = ActorSystem.create("appendentries", ConfigFactory
+ .load().getConfig("ODLCluster"));
+
+ ActorSelection actorSelection = actorSystem.actorSelection(
+ "akka.tcp://appendentries@127.0.0.1:2550/user/server");
+
+ AppendEntries appendEntries = modificationAppendEntries();
+
+ Payload data = appendEntries.getEntries().get(0).getData();
+ if(data instanceof CompositeModificationPayload) {
+ System.out.println(
+ "Sending : " + ((CompositeModificationPayload) data)
+ .getModification());
+ } else {
+ System.out.println(
+ "Sending : " + ((KeyValue) data)
+ .getKey());
+
+ }
+
+ actorSelection.tell(appendEntries.toSerializable(), null);
+
+
+
+
+ actorSystem.actorOf(Props.create(ClientActor.class), "client");
+ }
+
+ public static AppendEntries modificationAppendEntries() {
+ List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+ modification.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext()
+ );
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ return new CompositeModificationPayload(
+ compositeModification.toSerializable());
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ }
+
+ public static AppendEntries keyValueAppendEntries() {
+ List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+ modification.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ return new KeyValue("moiz", "test");
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class Server {
+
+ private static ActorSystem actorSystem;
+
+ public static class ServerActor extends UntypedActor {
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){
+ AppendEntries appendEntries =
+ AppendEntries.fromSerializable(message);
+
+ Payload data = appendEntries.getEntries()
+ .get(0).getData();
+ if(data instanceof KeyValue){
+ System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey());
+ } else {
+ System.out.println("Received :" +
+ ((CompositeModificationPayload) appendEntries
+ .getEntries()
+ .get(0).getData()).getModification().toString());
+ }
+ } else if(message instanceof String){
+ System.out.println(message);
+ }
+ }
+ }
+
+ public static void main(String[] args){
+ actorSystem = ActorSystem.create("appendentries", ConfigFactory
+ .load().getConfig("ODLCluster"));
+
+ actorSystem.actorOf(Props.create(ServerActor.class), "server");
+ }
+}