Merge "BUG 1162 - ensure data for all path arguments in datastore."
authorTony Tkacik <ttkacik@cisco.com>
Mon, 4 Aug 2014 14:11:50 +0000 (14:11 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 4 Aug 2014 14:11:50 +0000 (14:11 +0000)
30 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-distributed-datastore/client.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/server.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java [new file with mode: 0644]

index 5ce5840da8fc4deb45df33c1d2fc102933c2fe7e..a7bbbe772d8514c150ec2a92de4acd3cdbca00d9 100644 (file)
         <artifactId>sal-test-model</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-distributed-datastore</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
 
       <!-- SAL Extension bundles -->
       <dependency>
index 7c3289adb5e0a7857227549535a8997be6837114..e4468b6f2735f4c7189ccf236fc48b5d8384e0e6 100644 (file)
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
+          <dependency>
+              <groupId>org.opendaylight.controller</groupId>
+              <artifactId>sal-distributed-datastore</artifactId>
+          </dependency>
+
       </dependencies>
     </profile>
     <profile>
index aa100df9d0517dfa014a78054e2319d8b1bd34fe..cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9 100644 (file)
@@ -92,6 +92,10 @@ public class ExampleActor extends RaftActor {
         LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
     }
 
+    @Override protected void onStateChanged() {
+
+    }
+
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
index 70b85b4627dc707b0223f8a7bdae37dfab1441f1..caa0e507c1b0ec408745150863920f720c94a1cb 100644 (file)
@@ -341,6 +341,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void applySnapshot(Object snapshot);
 
+    /**
+     * This method will be called by the RaftActor when the state of the
+     * RaftActor changes. The derived actor can then use methods like
+     * isLeader or getLeader to do something useful
+     */
+    protected abstract void onStateChanged();
+
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {
@@ -367,6 +374,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else {
             behavior = new Leader(context);
         }
+
+        onStateChanged();
+
         return behavior;
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/client.conf b/opendaylight/md-sal/sal-distributed-datastore/client.conf
new file mode 100644 (file)
index 0000000..90bfb4c
--- /dev/null
@@ -0,0 +1,36 @@
+ODLCluster{
+  akka {
+    actor {
+      serialize-messages = on
+
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+                  "com.google.protobuf.GeneratedMessage" = proto
+                  "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+                  "com.google.protobuf.FieldSet" = proto
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2552
+       maximum-frame-size = 2097152
+       send-buffer-size = 52428800
+       receive-buffer-size = 52428800
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/server.conf b/opendaylight/md-sal/sal-distributed-datastore/server.conf
new file mode 100644 (file)
index 0000000..6209adf
--- /dev/null
@@ -0,0 +1,37 @@
+
+ODLCluster{
+  akka {
+    actor {
+      serialize-messages = on
+
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+                  "com.google.protobuf.GeneratedMessage" = proto
+                  "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+                  "com.google.protobuf.FieldSet" = proto
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2550
+       maximum-frame-size = 2097152
+       send-buffer-size = 52428800
+       receive-buffer-size = 52428800
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
index 955e4bbf220cc458cd20638967a91c849dab6909..abc69f18975aeb671713e955e11498f1c7d55050 100644 (file)
@@ -10,7 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
@@ -43,7 +44,28 @@ public class CompositeModificationPayload extends Payload implements
         PersistentMessages.CompositeModification modification = payload
             .getExtension(
                 org.opendaylight.controller.mdsal.CompositeModificationPayload.modification);
-        payload.getExtension(KeyValueMessages.value);
+
+
+
+        // The extension was put in the unknown field.
+        // This is because extensions need to be registered
+        // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+        // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+        // If that is not done then on the other end the extension shows up as an unknown field
+        // Need to figure out a better way to do this
+        if(payload.getUnknownFields().hasField(2)){
+            UnknownFieldSet.Field field =
+                payload.getUnknownFields().getField(2);
+
+            try {
+                modification =
+                    PersistentMessages.CompositeModification
+                        .parseFrom(field.getLengthDelimitedList().get(0));
+            } catch (InvalidProtocolBufferException e) {
+
+            }
+        }
+
         return new CompositeModificationPayload(modification);
     }
 
index 3af6f56a2c78fe40ddd9cfa60ac5fe7bd60348c9..b435eda7a381560320825d44458580e3c165a403 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -22,6 +23,7 @@ public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private final SchemaContext schemaContext;
     private final YangInstanceIdentifier pathId;
+    private boolean notificationsEnabled = false;
 
     public DataChangeListener(SchemaContext schemaContext,
                               AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
@@ -32,15 +34,30 @@ public class DataChangeListener extends AbstractUntypedActor {
 
     @Override public void handleReceive(Object message) throws Exception {
         if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
-            DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
-            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-                change = reply.getChange();
-            this.listener.onDataChanged(change);
+            dataChanged(message);
+        } else if(message instanceof EnableNotification){
+            enableNotification((EnableNotification) message);
+        }
+    }
 
-            if(getSender() != null){
-                getSender().tell(new DataChangedReply().toSerializable(), getSelf());
-            }
+    private void enableNotification(EnableNotification message) {
+        notificationsEnabled = message.isEnabled();
+    }
+
+    public void dataChanged(Object message) {
+
+        // Do nothing if notifications are not enabled
+        if(!notificationsEnabled){
+            return;
+        }
+
+        DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
+            change = reply.getChange();
+        this.listener.onDataChanged(change);
 
+        if(getSender() != null){
+            getSender().tell(new DataChangedReply().toSerializable(), getSelf());
         }
     }
 
index d21ea51c2a8feca7e9665dd1303f3288ec3de834..780f28f358ec327f5ec6ab9cace63f769695e55a 100644 (file)
@@ -86,18 +86,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Object result = actorContext.executeShardOperation(shardName,
+        Object result = actorContext.executeLocalShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
                 scope).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
-        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
-        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
+        if (result != null) {
+            RegisterChangeListenerReply reply = RegisterChangeListenerReply
+                .fromSerializable(actorContext.getActorSystem(), result);
+            return new DataChangeListenerRegistrationProxy(actorContext
+                .actorSelection(reply.getListenerRegistrationPath()), listener,
+                dataChangeListenerActor);
+        }
+
+        LOG.debug(
+            "No local shard for shardName {} was found so returning a noop registration",
+            shardName);
+        return new NoOpDataChangeListenerRegistration(listener);
     }
 
 
 
+
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext, executor, schemaContext);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..14af31e
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * When a consumer registers a data change listener and no local shard is
+ * available to register that listener with then we return an instance of
+ * NoOpDataChangeListenerRegistration
+ *
+ * <p>
+ *
+ * The NoOpDataChangeListenerRegistration as it's name suggests does
+ * nothing when an operation is invoked on it
+ */
+public class NoOpDataChangeListenerRegistration
+    implements ListenerRegistration {
+
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
+        listener;
+
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+
+        this.listener = listener;
+    }
+
+    @Override
+    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+        return listener;
+    }
+
+    @Override public void close() {
+
+    }
+}
index a7089a7f75c9cd01a5c336a5f400a71c30a605fd..23e27c9f5ff909b2c9b887aa5f7687b92d7cda10 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -44,7 +45,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -80,6 +83,8 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
+    private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+
     private Shard(String name, Map<String, String> peerAddresses) {
         super(name, peerAddresses);
 
@@ -157,10 +162,23 @@ public class Shard extends RaftActor {
             modificationToCohort.remove(serialized);
         if (cohort == null) {
             LOG.error(
-                "Could not find cohort for modification : " + modification);
+                "Could not find cohort for modification : {}", modification);
             LOG.info("Writing modification using a new transaction");
-            modification.apply(store.newReadWriteTransaction());
-            return;
+            DOMStoreReadWriteTransaction transaction =
+                store.newReadWriteTransaction();
+            modification.apply(transaction);
+            DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+            ListenableFuture<Void> future =
+                commitCohort.preCommit();
+            try {
+                future.get();
+                future = commitCohort.commit();
+                future.get();
+            } catch (InterruptedException e) {
+                LOG.error("Failed to commit", e);
+            } catch (ExecutionException e) {
+                LOG.error("Failed to commit", e);
+            }
         }
 
         final ListenableFuture<Void> future = cohort.commit();
@@ -215,6 +233,16 @@ public class Shard extends RaftActor {
             .system().actorSelection(
                 registerChangeListener.getDataChangeListenerPath());
 
+
+        // Notify the listener if notifications should be enabled or not
+        // If this shard is the leader then it will enable notifications else
+        // it will not
+        dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+
+        // Now store a reference to the data change listener so it can be notified
+        // at a later point if notifications should be enabled or disabled
+        dataChangeListeners.add(dataChangeListenerPath);
+
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
 
@@ -250,7 +278,14 @@ public class Shard extends RaftActor {
         if(data instanceof CompositeModificationPayload){
             Object modification =
                 ((CompositeModificationPayload) data).getModification();
-            commit(clientActor, modification);
+
+            if(modification != null){
+                commit(clientActor, modification);
+            } else {
+                LOG.error("modification is null - this is very unexpected");
+            }
+
+
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -265,6 +300,12 @@ public class Shard extends RaftActor {
         throw new UnsupportedOperationException("applySnapshot");
     }
 
+    @Override protected void onStateChanged() {
+        for(ActorSelection dataChangeListener : dataChangeListeners){
+            dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+        }
+    }
+
     @Override public String persistenceId() {
         return this.name;
     }
index 5fbce4cd98900be65053939ca7a51d4f7a929a98..64c6821120f94f99a389c12700757a7b8c7266f5 100644 (file)
@@ -18,7 +18,10 @@ import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
@@ -31,35 +34,27 @@ import java.util.Map;
 
 /**
  * The ShardManager has the following jobs,
- * <p>
+ * <ul>
  * <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the address of the local shard
  * <li> Find the primary replica for any given shard
- * <li> Engage in shard replica elections which decide which replica should be the primary
- * </p>
- * <p/>
- * <h3>>Creation of Shard replicas</h3
- * <p>
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- * </p>
- * <p/>
- * <h3> Replica Elections </h3>
- * <p/>
- * <p>
- * The Shard Manager uses multiple cues to initiate election.
- * <li> When a member of the cluster dies
- * <li> When a local shard replica dies
- * <li> When a local shard replica comes alive
- * </p>
+ * <li> Monitor the cluster members and store their addresses
+ * <ul>
  */
 public class ShardManager extends AbstractUntypedActor {
 
     // Stores a mapping between a member name and the address of the member
+    // Member names look like "member-1", "member-2" etc and are as specified
+    // in configuration
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
+    // Stores a mapping between a shard name and it's corresponding information
+    // Shard names look like inventory, topology etc and are as specified in
+    // configuration
     private final Map<String, ShardInformation> localShards = new HashMap<>();
 
-
+    // The type of a ShardManager reflects the type of the datastore itself
+    // A data store could be of type config/operational
     private final String type;
 
     private final ClusterWrapper cluster;
@@ -102,7 +97,8 @@ public class ShardManager extends AbstractUntypedActor {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
-
+        } else if(message instanceof FindLocalShard){
+            findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
         } else if (message instanceof ClusterEvent.MemberUp){
@@ -117,6 +113,18 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
+    private void findLocalShard(FindLocalShard message) {
+        ShardInformation shardInformation =
+            localShards.get(message.getShardName());
+
+        if(shardInformation != null){
+            getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+            return;
+        }
+
+        getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+    }
+
     private void ignoreMessage(Object message){
         LOG.debug("Unhandled message : " + message);
     }
@@ -137,6 +145,11 @@ public class ShardManager extends AbstractUntypedActor {
         }
     }
 
+    /**
+     * Notifies all the local shards of a change in the schema context
+     *
+     * @param message
+     */
     private void updateSchemaContext(Object message) {
         for(ShardInformation info : localShards.values()){
             info.getActor().tell(message,getSelf());
@@ -180,10 +193,7 @@ public class ShardManager extends AbstractUntypedActor {
         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
     }
 
-    private String
-
-
-    getShardActorPath(String shardName, String memberName) {
+    private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
             return address.toString() + "/user/shardmanager-" + this.type + "/"
@@ -193,11 +203,23 @@ public class ShardManager extends AbstractUntypedActor {
         return null;
     }
 
+    /**
+     * Construct the name of the shard actor given the name of the member on
+     * which the shard resides and the name of the shard
+     *
+     * @param memberName
+     * @param shardName
+     * @return
+     */
     private String getShardActorName(String memberName, String shardName){
         return memberName + "-shard-" + shardName + "-" + this.type;
     }
 
-    // Create the shards that are local to this member
+    /**
+     * Create shards that are local to the member on which the ShardManager
+     * runs
+     *
+     */
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
@@ -214,6 +236,12 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
+    /**
+     * Given the name of the shard find the addresses of all it's peers
+     *
+     * @param shardName
+     * @return
+     */
     private Map<String, String> getPeerAddresses(String shardName){
 
         Map<String, String> peerAddresses = new HashMap<>();
index cbd61b2087ac3c2bfd38f77ea3a34ed51d6786d8..333a8f8617040ab3540ba6be39f6848085236df4 100644 (file)
@@ -12,12 +12,11 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
-
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -200,9 +199,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             }
-        } catch(TimeoutException e){
-            LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
+        } catch(TimeoutException | PrimaryNotFoundException e){
+            LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+            remoteTransactionPaths.put(shardName,
+                new NoOpTransactionContext(shardName));
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java
new file mode 100644 (file)
index 0000000..67dab7e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class EnableNotification {
+    private final boolean enabled;
+
+    public EnableNotification(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java
new file mode 100644 (file)
index 0000000..c415db6
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * FindLocalShard is a message that should be sent to the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when we need to find a reference to a LocalShard
+ */
+public class FindLocalShard {
+    private final String shardName;
+
+    public FindLocalShard(String shardName) {
+        this.shardName = shardName;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java
new file mode 100644 (file)
index 0000000..feea38f
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+/**
+ * LocalShardFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it finds a shard with the specified name in it's local shard registry
+ */
+public class LocalShardFound {
+    private final ActorRef path;
+
+    public LocalShardFound(ActorRef path) {
+        this.path = path;
+    }
+
+    public ActorRef getPath() {
+        return path;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java
new file mode 100644 (file)
index 0000000..f6c6634
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * LocalShardNotFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it cannot locate a shard in it's local registry with the shardName specified
+ */
+public class LocalShardNotFound {
+    private final String shardName;
+
+    /**
+     *
+     * @param shardName the name of the shard that could not be found
+     */
+    public LocalShardNotFound(String shardName) {
+        this.shardName = shardName;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+}
index ac0893da5ad157c230fad2aab7401b27d0054751..4706c66e2594eae1384b465bf5d0b246c72d8223 100644 (file)
@@ -18,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -91,6 +93,29 @@ public class ActorContext {
         return actorSystem.actorSelection(path);
     }
 
+    /**
+     * Finds a local shard given it's shard name and return it's ActorRef
+     *
+     * @param shardName the name of the local shard that needs to be found
+     * @return a reference to a local shard actor which represents the shard
+     *         specified by the shardName
+     */
+    public ActorRef findLocalShard(String shardName) {
+        Object result = executeLocalOperation(shardManager,
+            new FindLocalShard(shardName), ASK_DURATION);
+
+        if (result instanceof LocalShardFound) {
+            LocalShardFound found = (LocalShardFound) result;
+
+            LOG.debug("Local shard found {}", found.getPath());
+
+            return found.getPath();
+        }
+
+        return null;
+    }
+
+
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
@@ -170,6 +195,34 @@ public class ActorContext {
         return executeRemoteOperation(primary, message, duration);
     }
 
+    /**
+     * Execute an operation on the the local shard only
+     * <p>
+     *     This method first finds the address of the local shard if any. It then
+     *     executes the operation on it.
+     * </p>
+     *
+     * @param shardName the name of the shard on which the operation needs to be executed
+     * @param message the message that needs to be sent to the shard
+     * @param duration the time duration in which this operation should complete
+     * @return the message that was returned by the local actor on which the
+     *         the operation was executed. If a local shard was not found then
+     *         null is returned
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+     *         if the operation does not complete in a specified time duration
+     */
+    public Object executeLocalShardOperation(String shardName, Object message,
+        FiniteDuration duration) {
+        ActorRef local = findLocalShard(shardName);
+
+        if(local != null) {
+            return executeLocalOperation(local, message, duration);
+        }
+
+        return null;
+    }
+
+
     public void shutdown() {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
new file mode 100644 (file)
index 0000000..400eab1
--- /dev/null
@@ -0,0 +1,84 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompositeModificationPayloadTest {
+
+
+    private static final String SERIALIZE_OUT = "serialize.out";
+
+    @After
+    public void shutDown(){
+        File f = new File(SERIALIZE_OUT);
+        if(f.exists()){
+            f.delete();
+        }
+    }
+
+    @Test
+    public void testBasic() throws IOException {
+
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+        entries.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                WriteModification writeModification =
+                    new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext());
+
+                MutableCompositeModification compositeModification =
+                    new MutableCompositeModification();
+
+                compositeModification.addModification(writeModification);
+
+                return new CompositeModificationPayload(compositeModification.toSerializable());
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        AppendEntries appendEntries =
+            new AppendEntries(1, "member-1", 0, 100, entries, 1);
+
+        AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable();
+
+        o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+
+        AppendEntriesMessages.AppendEntries appendEntries2 =
+            AppendEntriesMessages.AppendEntries
+                .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+
+        AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
+
+        Payload data = appendEntries1.getEntries().get(0).getData();
+
+
+        Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
+
+    }
+
+}
index fd610322201f1ffb168640d413f5afe0c8d743d7..c4ec8b45fc2726cfda7911287b0c59b6b82f9edd 100644 (file)
@@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -86,24 +87,28 @@ public class DataChangeListenerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testDataChanged(){
+    public void testDataChangedWhenNotificationsAreEnabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
             final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
             final ActorRef subject =
-                getSystem().actorOf(props, "testDataChanged");
+                getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
+                    // Let the DataChangeListener know that notifications should
+                    // be enabled
+                    subject.tell(new EnableNotification(true), getRef());
+
                     subject.tell(
                         new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
                         getRef());
 
-                    final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
+                    final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
-                            if (in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+                            if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
 
                                 return true;
                             } else {
@@ -115,7 +120,30 @@ public class DataChangeListenerTest extends AbstractActorTest {
                     assertTrue(out);
                     assertTrue(listener.gotIt());
                     assertNotNull(listener.getChange().getCreatedData());
-                    // Will wait for the rest of the 3 seconds
+
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testDataChangedWhenNotificationsAreDisabled(){
+        new JavaTestKit(getSystem()) {{
+            final MockDataChangeListener listener = new MockDataChangeListener();
+            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final ActorRef subject =
+                getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+                        getRef());
+
                     expectNoMsg();
                 }
 
index 5f4ac57da04d2163f30ac519b7aa6c6e72da3752..0a0c04b91586cbbda3d41dc24c4daca83ae78ae3 100644 (file)
@@ -5,7 +5,7 @@ import akka.testkit.JavaTestKit;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,9 +56,7 @@ public class DistributedDataStoreIntegrationTest{
 
         distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
-        // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
-        // the SchemaContext in time. Is there any way we can make this deterministic?
-        Thread.sleep(2000);
+        Thread.sleep(1500);
 
         DOMStoreReadWriteTransaction transaction =
             distributedDataStore.newReadWriteTransaction();
@@ -70,6 +68,8 @@ public class DistributedDataStoreIntegrationTest{
 
         Optional<NormalizedNode<?, ?>> optional = future.get();
 
+        Assert.assertTrue(optional.isPresent());
+
         NormalizedNode<?, ?> normalizedNode = optional.get();
 
         assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType());
index 23a1ed49315ec827baa95c41b51dfe4d23159d4e..03191f70f1ab5979d8bcc6189b2ddece6616c867 100644 (file)
@@ -54,8 +54,8 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     }
 
     @org.junit.Test
-    public void testRegisterChangeListener() throws Exception {
-        mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+    public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
+
         ListenerRegistration registration =
                 distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
@@ -64,9 +64,31 @@ public class DistributedDataStoreTest extends AbstractActorTest{
             }
         }, AsyncDataBroker.DataChangeScope.BASE);
 
+        // Since we do not expect the shard to be local registration will return a NoOpRegistration
+        Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
+
+        Assert.assertNotNull(registration);
+    }
+
+    @org.junit.Test
+    public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
+
+        mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+
+        ListenerRegistration registration =
+            distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+                @Override
+                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+                    throw new UnsupportedOperationException("onDataChanged");
+                }
+            }, AsyncDataBroker.DataChangeScope.BASE);
+
+        Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+
         Assert.assertNotNull(registration);
     }
 
+
     @org.junit.Test
     public void testCreateTransactionChain() throws Exception {
         final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
index 87d257a3f217bf0bfac0b3c8ade1cbe8d4555b04..268ed3c27383f3254eafec08e74c8f287456456a 100644 (file)
@@ -1,5 +1,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
@@ -8,13 +9,19 @@ import junit.framework.Assert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import scala.concurrent.duration.Duration;
 
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class ShardManagerTest {
     private static ActorSystem system;
 
@@ -47,7 +54,6 @@ public class ShardManagerTest {
                     expectMsgEquals(Duration.Zero(),
                         new PrimaryNotFound("inventory").toSerializable());
 
-                    // Will wait for the rest of the 3 seconds
                     expectNoMsg();
                 }
             };
@@ -64,7 +70,6 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
                 protected void run() {
 
@@ -78,6 +83,75 @@ public class ShardManagerTest {
         }};
     }
 
+    @Test
+    public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new FindLocalShard("inventory"), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+                        protected String match(Object in) {
+                            if (in instanceof LocalShardNotFound) {
+                                return ((LocalShardNotFound) in).getShardName();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("inventory", out);
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+
+        final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", mockClusterWrapper,
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+                    final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+                        protected ActorRef match(Object in) {
+                            if (in instanceof LocalShardFound) {
+                                return ((LocalShardFound) in).getPath();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
     @Test
     public void testOnReceiveMemberUp() throws Exception {
 
index 7d57ea8284e90dc3f941b7b82385e5db28ed75f4..38920d86ca36b8889276b6f9823381d0cc1a4239 100644 (file)
@@ -7,6 +7,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static junit.framework.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -92,6 +94,19 @@ public class ShardTest extends AbstractActorTest {
                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
                         getRef());
 
+                    final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if(in instanceof EnableNotification){
+                                return ((EnableNotification) in).isEnabled();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertFalse(notificationEnabled);
+
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
@@ -108,8 +123,6 @@ public class ShardTest extends AbstractActorTest {
 
                     assertTrue(out.matches(
                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-                    // Will wait for the rest of the 3 seconds
-                    expectNoMsg();
                 }
 
 
index 7d9d2dad8136de096f244e9baf7dbb10a01d9c8d..0cd029c2ffb2da4c093ede214ca037d5a5a345dc 100644 (file)
@@ -13,6 +13,8 @@ import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -32,10 +34,17 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.Executors;
 
+import static junit.framework.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TransactionProxyTest extends AbstractActorTest {
 
     private final Configuration configuration = new MockConfiguration();
@@ -121,6 +130,68 @@ public class TransactionProxyTest extends AbstractActorTest {
         Assert.assertFalse(normalizedNodeOptional.isPresent());
     }
 
+    @Test
+    public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
+        final ActorContext actorContext = mock(ActorContext.class);
+
+        when(actorContext.executeShardOperation(anyString(), any(), any(
+            FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+            transactionProxy.read(TestModel.TEST_PATH);
+
+        Assert.assertFalse(read.get().isPresent());
+
+    }
+
+
+    @Test
+    public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
+        final ActorContext actorContext = mock(ActorContext.class);
+
+        when(actorContext.executeShardOperation(anyString(), any(), any(
+            FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+            transactionProxy.read(TestModel.TEST_PATH);
+
+        Assert.assertFalse(read.get().isPresent());
+
+    }
+
+    @Test
+    public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
+        final ActorContext actorContext = mock(ActorContext.class);
+
+        when(actorContext.executeShardOperation(anyString(), any(), any(
+            FiniteDuration.class))).thenThrow(new NullPointerException());
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+        try {
+            ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+                transactionProxy.read(TestModel.TEST_PATH);
+            fail("A null pointer exception was expected");
+        } catch(NullPointerException e){
+
+        }
+    }
+
+
+
     @Test
     public void testWrite() throws Exception {
         final Props props = Props.create(MessageCollectorActor.class);
index 3dd0214e9b213c49a4821c303362f13eebe11371..5874eccda40f4f2d08f6b829757206213a743696 100644 (file)
@@ -2,12 +2,20 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
@@ -44,4 +52,145 @@ public class ActorContextTest extends AbstractActorTest{
         System.out.println(actorContext
             .actorFor("akka://system/user/shardmanager/shard/transaction"));
     }
+
+
+    private static class MockShardManager extends UntypedActor {
+
+        private final boolean found;
+        private final ActorRef actorRef;
+
+        private MockShardManager(boolean found, ActorRef actorRef){
+
+            this.found = found;
+            this.actorRef = actorRef;
+        }
+
+        @Override public void onReceive(Object message) throws Exception {
+            if(found){
+                getSender().tell(new LocalShardFound(actorRef), getSelf());
+            } else {
+                getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
+            }
+        }
+
+        private static Props props(final boolean found, final ActorRef actorRef){
+            return Props.create(new Creator<MockShardManager>() {
+
+                @Override public MockShardManager create()
+                    throws Exception {
+                    return new MockShardManager(found,
+                        actorRef);
+                }
+            });
+        }
+    }
+
+    @Test
+    public void testExecuteLocalShardOperationWithShardFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+                    assertEquals("hello", out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+    @Test
+    public void testExecuteLocalShardOperationWithShardNotFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(false, null));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+                    assertNull(out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+
+    @Test
+    public void testFindLocalShardWithShardFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.findLocalShard("default");
+
+                    assertEquals(shardActorRef, out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+    @Test
+    public void testFindLocalShardWithShardNotFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(false, null));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.findLocalShard("default");
+
+                    assertNull(out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java
new file mode 100644 (file)
index 0000000..fe88afe
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+/**
+ * The EchoActor simply responds back with the same message that it receives
+ */
+public class EchoActor extends UntypedActor{
+
+    @Override public void onReceive(Object message) throws Exception {
+        getSender().tell(message, getSelf());
+    }
+}
index 1d1e6614886e32d5593112375cbefaab0a9c1507..5d3853f311880d791fc177c9a1082f026fb1c5de 100644 (file)
@@ -19,6 +19,7 @@ public class MockActorContext extends ActorContext {
     private Object executeShardOperationResponse;
     private Object executeRemoteOperationResponse;
     private Object executeLocalOperationResponse;
+    private Object executeLocalShardOperationResponse;
 
     public MockActorContext(ActorSystem actorSystem) {
         super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
@@ -56,8 +57,18 @@ public class MockActorContext extends ActorContext {
         this.executeLocalOperationResponse = executeLocalOperationResponse;
     }
 
+    public void setExecuteLocalShardOperationResponse(
+        Object executeLocalShardOperationResponse) {
+        this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
+    }
+
     @Override public Object executeLocalOperation(ActorRef actor,
         Object message, FiniteDuration duration) {
         return this.executeLocalOperationResponse;
     }
+
+    @Override public Object executeLocalShardOperation(String shardName,
+        Object message, FiniteDuration duration) {
+        return this.executeLocalShardOperationResponse;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
new file mode 100644 (file)
index 0000000..2671be8
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Client {
+
+    private static ActorSystem actorSystem;
+
+    public static class ClientActor extends UntypedActor {
+
+        @Override public void onReceive(Object message) throws Exception {
+
+        }
+    }
+
+    public static void main(String[] args){
+        actorSystem = ActorSystem.create("appendentries", ConfigFactory
+            .load().getConfig("ODLCluster"));
+
+        ActorSelection actorSelection = actorSystem.actorSelection(
+            "akka.tcp://appendentries@127.0.0.1:2550/user/server");
+
+        AppendEntries appendEntries = modificationAppendEntries();
+
+        Payload data = appendEntries.getEntries().get(0).getData();
+        if(data instanceof CompositeModificationPayload) {
+            System.out.println(
+                "Sending : " + ((CompositeModificationPayload) data)
+                    .getModification());
+        } else {
+            System.out.println(
+                "Sending : " + ((KeyValue) data)
+                    .getKey());
+
+        }
+
+        actorSelection.tell(appendEntries.toSerializable(), null);
+
+
+
+
+        actorSystem.actorOf(Props.create(ClientActor.class), "client");
+    }
+
+    public static AppendEntries modificationAppendEntries() {
+        List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+        modification.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                WriteModification writeModification =
+                    new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext()
+                    );
+
+                MutableCompositeModification compositeModification =
+                    new MutableCompositeModification();
+
+                compositeModification.addModification(writeModification);
+
+                return new CompositeModificationPayload(
+                    compositeModification.toSerializable());
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+    }
+
+    public static AppendEntries keyValueAppendEntries() {
+        List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+        modification.add(0, new ReplicatedLogEntry() {
+            @Override public Payload getData() {
+                return new KeyValue("moiz", "test");
+            }
+
+            @Override public long getTerm() {
+                return 1;
+            }
+
+            @Override public long getIndex() {
+                return 1;
+            }
+        });
+
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java
new file mode 100644 (file)
index 0000000..0e6d535
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class Server {
+
+    private static ActorSystem actorSystem;
+
+    public static class ServerActor extends UntypedActor {
+
+        @Override public void onReceive(Object message) throws Exception {
+            if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){
+                AppendEntries appendEntries =
+                    AppendEntries.fromSerializable(message);
+
+                Payload data = appendEntries.getEntries()
+                    .get(0).getData();
+                if(data instanceof KeyValue){
+                    System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey());
+                } else {
+                    System.out.println("Received :" +
+                        ((CompositeModificationPayload) appendEntries
+                            .getEntries()
+                            .get(0).getData()).getModification().toString());
+                }
+            } else if(message instanceof String){
+                System.out.println(message);
+            }
+        }
+    }
+
+    public static void main(String[] args){
+        actorSystem = ActorSystem.create("appendentries", ConfigFactory
+            .load().getConfig("ODLCluster"));
+
+        actorSystem.actorOf(Props.create(ServerActor.class), "server");
+    }
+}