From 83641e25fafcabef7d64909b881a9a4b4b4dab87 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 1 Aug 2014 16:14:05 -0700 Subject: [PATCH] Fix a couple of issues with replication - First the data that was being sent from the primary was not being deserialized on the followers - The state on the followers was not being applied correctly - Add sal-distributed-datastore to the distribution Added a test for CompositeModificationPayload to ensure that serialization works correctly Also added a set of integration test programs to test the serialization Change-Id: Id67d2c4fe471003d9dd3b42a8376c90fd23492ce Signed-off-by: Moiz Raja --- opendaylight/commons/opendaylight/pom.xml | 5 + .../distribution/opendaylight/pom.xml | 5 + .../sal-distributed-datastore/client.conf | 36 ++++++ .../sal-distributed-datastore/server.conf | 37 ++++++ .../CompositeModificationPayload.java | 26 +++- .../controller/cluster/datastore/Shard.java | 28 +++- .../cluster/datastore/TransactionProxy.java | 2 +- .../CompositeModificationPayloadTest.java | 84 ++++++++++++ .../programs/appendentries/Client.java | 120 ++++++++++++++++++ .../programs/appendentries/Server.java | 53 ++++++++ 10 files changed, 389 insertions(+), 7 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/client.conf create mode 100644 opendaylight/md-sal/sal-distributed-datastore/server.conf create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 5ce5840da8..a7bbbe772d 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -1278,6 +1278,11 @@ sal-test-model ${mdsal.version} + + org.opendaylight.controller + sal-distributed-datastore + ${mdsal.version} + diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index 7c3289adb5..e4468b6f27 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -1291,6 +1291,11 @@ jeromq 0.3.1 + + org.opendaylight.controller + sal-distributed-datastore + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/client.conf b/opendaylight/md-sal/sal-distributed-datastore/client.conf new file mode 100644 index 0000000000..90bfb4c3e1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/client.conf @@ -0,0 +1,36 @@ +ODLCluster{ + akka { + actor { + serialize-messages = on + + provider = "akka.cluster.ClusterActorRefProvider" + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + "com.google.protobuf.GeneratedMessage" = proto + "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto + "com.google.protobuf.FieldSet" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + maximum-frame-size = 2097152 + send-buffer-size = 52428800 + receive-buffer-size = 52428800 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"] + + auto-down-unreachable-after = 10s + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/server.conf b/opendaylight/md-sal/sal-distributed-datastore/server.conf new file mode 100644 index 0000000000..6209adfc17 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/server.conf @@ -0,0 +1,37 @@ + +ODLCluster{ + akka { + actor { + serialize-messages = on + + provider = "akka.cluster.ClusterActorRefProvider" + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + "com.google.protobuf.GeneratedMessage" = proto + "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto + "com.google.protobuf.FieldSet" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2550 + maximum-frame-size = 2097152 + send-buffer-size = 52428800 + receive-buffer-size = 52428800 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"] + + auto-down-unreachable-after = 10s + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java index 955e4bbf22..abc69f1897 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java @@ -10,7 +10,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; -import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; @@ -43,7 +44,28 @@ public class CompositeModificationPayload extends Payload implements PersistentMessages.CompositeModification modification = payload .getExtension( org.opendaylight.controller.mdsal.CompositeModificationPayload.modification); - payload.getExtension(KeyValueMessages.value); + + + + // The extension was put in the unknown field. + // This is because extensions need to be registered + // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions + // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry + // If that is not done then on the other end the extension shows up as an unknown field + // Need to figure out a better way to do this + if(payload.getUnknownFields().hasField(2)){ + UnknownFieldSet.Field field = + payload.getUnknownFields().getField(2); + + try { + modification = + PersistentMessages.CompositeModification + .parseFrom(field.getLengthDelimitedList().get(0)); + } catch (InvalidProtocolBufferException e) { + + } + } + return new CompositeModificationPayload(modification); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a7089a7f75..46f09217d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -157,10 +157,23 @@ public class Shard extends RaftActor { modificationToCohort.remove(serialized); if (cohort == null) { LOG.error( - "Could not find cohort for modification : " + modification); + "Could not find cohort for modification : {}", modification); LOG.info("Writing modification using a new transaction"); - modification.apply(store.newReadWriteTransaction()); - return; + DOMStoreReadWriteTransaction transaction = + store.newReadWriteTransaction(); + modification.apply(transaction); + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + ListenableFuture future = + commitCohort.preCommit(); + try { + future.get(); + future = commitCohort.commit(); + future.get(); + } catch (InterruptedException e) { + LOG.error("Failed to commit", e); + } catch (ExecutionException e) { + LOG.error("Failed to commit", e); + } } final ListenableFuture future = cohort.commit(); @@ -250,7 +263,14 @@ public class Shard extends RaftActor { if(data instanceof CompositeModificationPayload){ Object modification = ((CompositeModificationPayload) data).getModification(); - commit(clientActor, modification); + + if(modification != null){ + commit(clientActor, modification); + } else { + LOG.error("modification is null - this is very unexpected"); + } + + } else { LOG.error("Unknown state received {}", data); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index cbd61b2087..b434c9e463 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -201,7 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionPaths.put(shardName, transactionContext); } } catch(TimeoutException e){ - LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e); + LOG.error("Creating NoOpTransaction because of : {}", e.getMessage()); remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java new file mode 100644 index 0000000000..400eab1d8e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java @@ -0,0 +1,84 @@ +package org.opendaylight.controller.cluster.datastore; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class CompositeModificationPayloadTest { + + + private static final String SERIALIZE_OUT = "serialize.out"; + + @After + public void shutDown(){ + File f = new File(SERIALIZE_OUT); + if(f.exists()){ + f.delete(); + } + } + + @Test + public void testBasic() throws IOException { + + List entries = new ArrayList<>(); + + entries.add(0, new ReplicatedLogEntry() { + @Override public Payload getData() { + WriteModification writeModification = + new WriteModification(TestModel.TEST_PATH, ImmutableNodes + .containerNode(TestModel.TEST_QNAME), + TestModel.createTestContext()); + + MutableCompositeModification compositeModification = + new MutableCompositeModification(); + + compositeModification.addModification(writeModification); + + return new CompositeModificationPayload(compositeModification.toSerializable()); + } + + @Override public long getTerm() { + return 1; + } + + @Override public long getIndex() { + return 1; + } + }); + + AppendEntries appendEntries = + new AppendEntries(1, "member-1", 0, 100, entries, 1); + + AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable(); + + o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT)); + + AppendEntriesMessages.AppendEntries appendEntries2 = + AppendEntriesMessages.AppendEntries + .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT)); + + AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2); + + Payload data = appendEntries1.getEntries().get(0).getData(); + + + Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString())); + + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java new file mode 100644 index 0000000000..2671be80bb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.programs.appendentries; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.typesafe.config.ConfigFactory; +import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +import java.util.ArrayList; +import java.util.List; + +public class Client { + + private static ActorSystem actorSystem; + + public static class ClientActor extends UntypedActor { + + @Override public void onReceive(Object message) throws Exception { + + } + } + + public static void main(String[] args){ + actorSystem = ActorSystem.create("appendentries", ConfigFactory + .load().getConfig("ODLCluster")); + + ActorSelection actorSelection = actorSystem.actorSelection( + "akka.tcp://appendentries@127.0.0.1:2550/user/server"); + + AppendEntries appendEntries = modificationAppendEntries(); + + Payload data = appendEntries.getEntries().get(0).getData(); + if(data instanceof CompositeModificationPayload) { + System.out.println( + "Sending : " + ((CompositeModificationPayload) data) + .getModification()); + } else { + System.out.println( + "Sending : " + ((KeyValue) data) + .getKey()); + + } + + actorSelection.tell(appendEntries.toSerializable(), null); + + + + + actorSystem.actorOf(Props.create(ClientActor.class), "client"); + } + + public static AppendEntries modificationAppendEntries() { + List modification = new ArrayList<>(); + + modification.add(0, new ReplicatedLogEntry() { + @Override public Payload getData() { + WriteModification writeModification = + new WriteModification(TestModel.TEST_PATH, ImmutableNodes + .containerNode(TestModel.TEST_QNAME), + TestModel.createTestContext() + ); + + MutableCompositeModification compositeModification = + new MutableCompositeModification(); + + compositeModification.addModification(writeModification); + + return new CompositeModificationPayload( + compositeModification.toSerializable()); + } + + @Override public long getTerm() { + return 1; + } + + @Override public long getIndex() { + return 1; + } + }); + + return new AppendEntries(1, "member-1", 0, 100, modification, 1); + } + + public static AppendEntries keyValueAppendEntries() { + List modification = new ArrayList<>(); + + modification.add(0, new ReplicatedLogEntry() { + @Override public Payload getData() { + return new KeyValue("moiz", "test"); + } + + @Override public long getTerm() { + return 1; + } + + @Override public long getIndex() { + return 1; + } + }); + + return new AppendEntries(1, "member-1", 0, 100, modification, 1); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java new file mode 100644 index 0000000000..0e6d535301 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.programs.appendentries; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.typesafe.config.ConfigFactory; +import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; + +public class Server { + + private static ActorSystem actorSystem; + + public static class ServerActor extends UntypedActor { + + @Override public void onReceive(Object message) throws Exception { + if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){ + AppendEntries appendEntries = + AppendEntries.fromSerializable(message); + + Payload data = appendEntries.getEntries() + .get(0).getData(); + if(data instanceof KeyValue){ + System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey()); + } else { + System.out.println("Received :" + + ((CompositeModificationPayload) appendEntries + .getEntries() + .get(0).getData()).getModification().toString()); + } + } else if(message instanceof String){ + System.out.println(message); + } + } + } + + public static void main(String[] args){ + actorSystem = ActorSystem.create("appendentries", ConfigFactory + .load().getConfig("ODLCluster")); + + actorSystem.actorOf(Props.create(ServerActor.class), "server"); + } +} -- 2.36.6