Fix a couple of issues with replication 96/9596/4
authorMoiz Raja <moraja@cisco.com>
Fri, 1 Aug 2014 23:14:05 +0000 (16:14 -0700)
committerMoiz Raja <moraja@cisco.com>
Sun, 3 Aug 2014 22:36:45 +0000 (15:36 -0700)
- First the data that was being sent from the primary was not being deserialized on the followers
- The state on the followers was not being applied correctly
- Add sal-distributed-datastore to the distribution

Added a test for CompositeModificationPayload to ensure that serialization works correctly
Also added a set of integration test programs to test the serialization

Change-Id: Id67d2c4fe471003d9dd3b42a8376c90fd23492ce
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/sal-distributed-datastore/client.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/server.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java [new file with mode: 0644]

index 5ce5840da8fc4deb45df33c1d2fc102933c2fe7e..a7bbbe772d8514c150ec2a92de4acd3cdbca00d9 100644 (file)
         <artifactId>sal-test-model</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-distributed-datastore</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
 
       <!-- SAL Extension bundles -->
       <dependency>
index 7c3289adb5e0a7857227549535a8997be6837114..e4468b6f2735f4c7189ccf236fc48b5d8384e0e6 100644 (file)
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
+          <dependency>
+              <groupId>org.opendaylight.controller</groupId>
+              <artifactId>sal-distributed-datastore</artifactId>
+          </dependency>
+
       </dependencies>
     </profile>
     <profile>
diff --git a/opendaylight/md-sal/sal-distributed-datastore/client.conf b/opendaylight/md-sal/sal-distributed-datastore/client.conf
new file mode 100644 (file)
index 0000000..90bfb4c
--- /dev/null
@@ -0,0 +1,36 @@
+ODLCluster{
+  akka {
+    actor {
+      serialize-messages = on
+
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+                  "com.google.protobuf.GeneratedMessage" = proto
+                  "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+                  "com.google.protobuf.FieldSet" = proto
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2552
+       maximum-frame-size = 2097152
+       send-buffer-size = 52428800
+       receive-buffer-size = 52428800
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/server.conf b/opendaylight/md-sal/sal-distributed-datastore/server.conf
new file mode 100644 (file)
index 0000000..6209adf
--- /dev/null
@@ -0,0 +1,37 @@
+
+ODLCluster{
+  akka {
+    actor {
+      serialize-messages = on
+
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+                  "com.google.protobuf.GeneratedMessage" = proto
+                  "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+                  "com.google.protobuf.FieldSet" = proto
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2550
+       maximum-frame-size = 2097152
+       send-buffer-size = 52428800
+       receive-buffer-size = 52428800
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
index 955e4bbf220cc458cd20638967a91c849dab6909..abc69f18975aeb671713e955e11498f1c7d55050 100644 (file)
@@ -10,7 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
@@ -43,7 +44,28 @@ public class CompositeModificationPayload extends Payload implements
         PersistentMessages.CompositeModification modification = payload
             .getExtension(
                 org.opendaylight.controller.mdsal.CompositeModificationPayload.modification);
-        payload.getExtension(KeyValueMessages.value);
+
+
+
+        // The extension was put in the unknown field.
+        // This is because extensions need to be registered
+        // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+        // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+        // If that is not done then on the other end the extension shows up as an unknown field
+        // Need to figure out a better way to do this
+        if(payload.getUnknownFields().hasField(2)){
+            UnknownFieldSet.Field field =
+                payload.getUnknownFields().getField(2);
+
+            try {
+                modification =
+                    PersistentMessages.CompositeModification
+                        .parseFrom(field.getLengthDelimitedList().get(0));
+            } catch (InvalidProtocolBufferException e) {
+
+            }
+        }
+
         return new CompositeModificationPayload(modification);
     }
 
index a7089a7f75c9cd01a5c336a5f400a71c30a605fd..46f09217d04a3119f97b4ae25b66737a22690caa 100644 (file)
@@ -157,10 +157,23 @@ public class Shard extends RaftActor {
             modificationToCohort.remove(serialized);
         if (cohort == null) {
             LOG.error(
-                "Could not find cohort for modification : " + modification);
+                "Could not find cohort for modification : {}", modification);
             LOG.info("Writing modification using a new transaction");
-            modification.apply(store.newReadWriteTransaction());
-            return;
+            DOMStoreReadWriteTransaction transaction =
+                store.newReadWriteTransaction();
+            modification.apply(transaction);
+            DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+            ListenableFuture<Void> future =
+                commitCohort.preCommit();
+            try {
+                future.get();
+                future = commitCohort.commit();
+                future.get();
+            } catch (InterruptedException e) {
+                LOG.error("Failed to commit", e);
+            } catch (ExecutionException e) {
+                LOG.error("Failed to commit", e);
+            }
         }
 
         final ListenableFuture<Void> future = cohort.commit();
@@ -250,7 +263,14 @@ public class Shard extends RaftActor {
         if(data instanceof CompositeModificationPayload){
             Object modification =
                 ((CompositeModificationPayload) data).getModification();
-            commit(clientActor, modification);
+
+            if(modification != null){
+                commit(clientActor, modification);
+            } else {
+                LOG.error("modification is null - this is very unexpected");
+            }
+
+
         } else {
             LOG.error("Unknown state received {}", data);
         }
index cbd61b2087ac3c2bfd38f77ea3a34ed51d6786d8..b434c9e463b90ef951599baccc13e4de38343ba5 100644 (file)
@@ -201,7 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 remoteTransactionPaths.put(shardName, transactionContext);
             }
         } catch(TimeoutException e){
-            LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
+            LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
new file mode 100644 (file)
index 0000000..400eab1
--- /dev/null
@@ -0,0 +1,84 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompositeModificationPayloadTest {
+
+
+    private static final String SERIALIZE_OUT = "serialize.out";
+
+    @After
+    public void shutDown(){
+        File f = new File(SERIALIZE_OUT);
+        if(f.exists()){
+            f.delete();
+        }
+    }
+
+    @Test
+    public void testBasic() throws IOException {
+
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+        entries.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                WriteModification writeModification =
+                    new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext());
+
+                MutableCompositeModification compositeModification =
+                    new MutableCompositeModification();
+
+                compositeModification.addModification(writeModification);
+
+                return new CompositeModificationPayload(compositeModification.toSerializable());
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        AppendEntries appendEntries =
+            new AppendEntries(1, "member-1", 0, 100, entries, 1);
+
+        AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable();
+
+        o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+
+        AppendEntriesMessages.AppendEntries appendEntries2 =
+            AppendEntriesMessages.AppendEntries
+                .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+
+        AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
+
+        Payload data = appendEntries1.getEntries().get(0).getData();
+
+
+        Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
new file mode 100644 (file)
index 0000000..2671be8
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Client {
+
+    private static ActorSystem actorSystem;
+
+    public static class ClientActor extends UntypedActor {
+
+        @Override public void onReceive(Object message) throws Exception {
+
+        }
+    }
+
+    public static void main(String[] args){
+        actorSystem = ActorSystem.create("appendentries", ConfigFactory
+            .load().getConfig("ODLCluster"));
+
+        ActorSelection actorSelection = actorSystem.actorSelection(
+            "akka.tcp://appendentries@127.0.0.1:2550/user/server");
+
+        AppendEntries appendEntries = modificationAppendEntries();
+
+        Payload data = appendEntries.getEntries().get(0).getData();
+        if(data instanceof CompositeModificationPayload) {
+            System.out.println(
+                "Sending : " + ((CompositeModificationPayload) data)
+                    .getModification());
+        } else {
+            System.out.println(
+                "Sending : " + ((KeyValue) data)
+                    .getKey());
+
+        }
+
+        actorSelection.tell(appendEntries.toSerializable(), null);
+
+
+
+
+        actorSystem.actorOf(Props.create(ClientActor.class), "client");
+    }
+
+    public static AppendEntries modificationAppendEntries() {
+        List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+        modification.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                WriteModification writeModification =
+                    new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext()
+                    );
+
+                MutableCompositeModification compositeModification =
+                    new MutableCompositeModification();
+
+                compositeModification.addModification(writeModification);
+
+                return new CompositeModificationPayload(
+                    compositeModification.toSerializable());
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+    }
+
+    public static AppendEntries keyValueAppendEntries() {
+        List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+        modification.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                return new KeyValue("moiz", "test");
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java
new file mode 100644 (file)
index 0000000..0e6d535
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class Server {
+
+    private static ActorSystem actorSystem;
+
+    public static class ServerActor extends UntypedActor {
+
+        @Override public void onReceive(Object message) throws Exception {
+            if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){
+                AppendEntries appendEntries =
+                    AppendEntries.fromSerializable(message);
+
+                Payload data = appendEntries.getEntries()
+                    .get(0).getData();
+                if(data instanceof KeyValue){
+                    System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey());
+                } else {
+                    System.out.println("Received :" +
+                        ((CompositeModificationPayload) appendEntries
+                            .getEntries()
+                            .get(0).getData()).getModification().toString());
+                }
+            } else if(message instanceof String){
+                System.out.println(message);
+            }
+        }
+    }
+
+    public static void main(String[] args){
+        actorSystem = ActorSystem.create("appendentries", ConfigFactory
+            .load().getConfig("ODLCluster"));
+
+        actorSystem.actorOf(Props.create(ServerActor.class), "server");
+    }
+}