From b584e686fdeba863643f80c0894d7fbd2dcaa540 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Sun, 3 Aug 2014 06:36:13 -0700 Subject: [PATCH] Keep DataChange registrations and notifications local Here is how it works -------------------- When a consumer registers for a datachange notification we first check if the Shard to which this registration needs to be sent is local. If it is then we send the registration to it. If it is not then we simply return a NoOpRegistration to the consumer. This ensures that a DataChange registration stays local to a cluster member. Now let's say we have 3 replicas and in all those replicas we do find a local shard for a given module and we have 3 listener registrations. At this time we do not want all 3 members to be notified of the change. We only want the node which is the Leader to notify it's listeners of the change. To restrict that the DataChangeListener will only forward events to the listener if the Shard sends it a message enabling notifications Change-Id: I035bbdf34e2509047f4bcb475c443a761b08e68d Signed-off-by: Moiz Raja --- .../cluster/example/ExampleActor.java | 4 + .../controller/cluster/raft/RaftActor.java | 10 ++ .../cluster/datastore/DataChangeListener.java | 31 +++- .../datastore/DistributedDataStore.java | 18 ++- .../NoOpDataChangeListenerRegistration.java | 46 ++++++ .../controller/cluster/datastore/Shard.java | 21 +++ .../cluster/datastore/ShardManager.java | 78 ++++++--- .../messages/EnableNotification.java | 21 +++ .../datastore/messages/FindLocalShard.java | 25 +++ .../datastore/messages/LocalShardFound.java | 27 ++++ .../messages/LocalShardNotFound.java | 29 ++++ .../cluster/datastore/utils/ActorContext.java | 53 +++++++ .../datastore/DataChangeListenerTest.java | 38 ++++- .../datastore/DistributedDataStoreTest.java | 26 ++- .../cluster/datastore/ShardManagerTest.java | 78 ++++++++- .../cluster/datastore/ShardTest.java | 17 +- .../datastore/utils/ActorContextTest.java | 149 ++++++++++++++++++ .../cluster/datastore/utils/EchoActor.java | 21 +++ .../datastore/utils/MockActorContext.java | 11 ++ 19 files changed, 657 insertions(+), 46 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index aa100df9d0..cbd7ca2d70 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -92,6 +92,10 @@ public class ExampleActor extends RaftActor { LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); } + @Override protected void onStateChanged() { + + } + @Override public void onReceiveRecover(Object message) { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 70b85b4627..caa0e507c1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -341,6 +341,13 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void applySnapshot(Object snapshot); + /** + * This method will be called by the RaftActor when the state of the + * RaftActor changes. The derived actor can then use methods like + * isLeader or getLeader to do something useful + */ + protected abstract void onStateChanged(); + private RaftActorBehavior switchBehavior(RaftState state) { if (currentBehavior != null) { if (currentBehavior.state() == state) { @@ -367,6 +374,9 @@ public abstract class RaftActor extends UntypedPersistentActor { } else { behavior = new Leader(context); } + + onStateChanged(); + return behavior; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index 3af6f56a2c..b435eda7a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -12,6 +12,7 @@ import akka.actor.Props; import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -22,6 +23,7 @@ public class DataChangeListener extends AbstractUntypedActor { private final AsyncDataChangeListener> listener; private final SchemaContext schemaContext; private final YangInstanceIdentifier pathId; + private boolean notificationsEnabled = false; public DataChangeListener(SchemaContext schemaContext, AsyncDataChangeListener> listener, YangInstanceIdentifier pathId) { @@ -32,15 +34,30 @@ public class DataChangeListener extends AbstractUntypedActor { @Override public void handleReceive(Object message) throws Exception { if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){ - DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId); - AsyncDataChangeEvent> - change = reply.getChange(); - this.listener.onDataChanged(change); + dataChanged(message); + } else if(message instanceof EnableNotification){ + enableNotification((EnableNotification) message); + } + } - if(getSender() != null){ - getSender().tell(new DataChangedReply().toSerializable(), getSelf()); - } + private void enableNotification(EnableNotification message) { + notificationsEnabled = message.isEnabled(); + } + + public void dataChanged(Object message) { + + // Do nothing if notifications are not enabled + if(!notificationsEnabled){ + return; + } + + DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId); + AsyncDataChangeEvent> + change = reply.getChange(); + this.listener.onDataChanged(change); + if(getSender() != null){ + getSender().tell(new DataChangedReply().toSerializable(), getSelf()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index d21ea51c2a..780f28f358 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -86,18 +86,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); - Object result = actorContext.executeShardOperation(shardName, + Object result = actorContext.executeLocalShardOperation(shardName, new RegisterChangeListener(path, dataChangeListenerActor.path(), scope).toSerializable(), ActorContext.ASK_DURATION ); - RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result); - return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor); + if (result != null) { + RegisterChangeListenerReply reply = RegisterChangeListenerReply + .fromSerializable(actorContext.getActorSystem(), result); + return new DataChangeListenerRegistrationProxy(actorContext + .actorSelection(reply.getListenerRegistrationPath()), listener, + dataChangeListenerActor); + } + + LOG.debug( + "No local shard for shardName {} was found so returning a noop registration", + shardName); + return new NoOpDataChangeListenerRegistration(listener); } + + @Override public DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(actorContext, executor, schemaContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java new file mode 100644 index 0000000000..14af31e898 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * When a consumer registers a data change listener and no local shard is + * available to register that listener with then we return an instance of + * NoOpDataChangeListenerRegistration + * + *

+ * + * The NoOpDataChangeListenerRegistration as it's name suggests does + * nothing when an operation is invoked on it + */ +public class NoOpDataChangeListenerRegistration + implements ListenerRegistration { + + private final AsyncDataChangeListener> + listener; + + public >> NoOpDataChangeListenerRegistration( + AsyncDataChangeListener> listener) { + + this.listener = listener; + } + + @Override + public AsyncDataChangeListener> getInstance() { + return listener; + } + + @Override public void close() { + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 46f09217d0..23e27c9f5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -44,7 +45,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -80,6 +83,8 @@ public class Shard extends RaftActor { private final ShardStats shardMBean; + private final List dataChangeListeners = new ArrayList<>(); + private Shard(String name, Map peerAddresses) { super(name, peerAddresses); @@ -228,6 +233,16 @@ public class Shard extends RaftActor { .system().actorSelection( registerChangeListener.getDataChangeListenerPath()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + dataChangeListeners.add(dataChangeListenerPath); + AsyncDataChangeListener> listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath); @@ -285,6 +300,12 @@ public class Shard extends RaftActor { throw new UnsupportedOperationException("applySnapshot"); } + @Override protected void onStateChanged() { + for(ActorSelection dataChangeListener : dataChangeListeners){ + dataChangeListener.tell(new EnableNotification(isLeader()), getSelf()); + } + } + @Override public String persistenceId() { return this.name; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 5fbce4cd98..64c6821120 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,7 +18,10 @@ import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; @@ -31,35 +34,27 @@ import java.util.Map; /** * The ShardManager has the following jobs, - *

+ *

    *
  • Create all the local shard replicas that belong on this cluster member + *
  • Find the address of the local shard *
  • Find the primary replica for any given shard - *
  • Engage in shard replica elections which decide which replica should be the primary - *

    - *

    - *

    >Creation of Shard replicas

    - * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas - * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. - *

    - *

    - *

    Replica Elections

    - *

    - *

    - * The Shard Manager uses multiple cues to initiate election. - *

  • When a member of the cluster dies - *
  • When a local shard replica dies - *
  • When a local shard replica comes alive - *

    + *
  • Monitor the cluster members and store their addresses + *
      */ public class ShardManager extends AbstractUntypedActor { // Stores a mapping between a member name and the address of the member + // Member names look like "member-1", "member-2" etc and are as specified + // in configuration private final Map memberNameToAddress = new HashMap<>(); + // Stores a mapping between a shard name and it's corresponding information + // Shard names look like inventory, topology etc and are as specified in + // configuration private final Map localShards = new HashMap<>(); - + // The type of a ShardManager reflects the type of the datastore itself + // A data store could be of type config/operational private final String type; private final ClusterWrapper cluster; @@ -102,7 +97,8 @@ public class ShardManager extends AbstractUntypedActor { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { findPrimary( FindPrimary.fromSerializable(message)); - + } else if(message instanceof FindLocalShard){ + findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext(message); } else if (message instanceof ClusterEvent.MemberUp){ @@ -117,6 +113,18 @@ public class ShardManager extends AbstractUntypedActor { } + private void findLocalShard(FindLocalShard message) { + ShardInformation shardInformation = + localShards.get(message.getShardName()); + + if(shardInformation != null){ + getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf()); + return; + } + + getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); + } + private void ignoreMessage(Object message){ LOG.debug("Unhandled message : " + message); } @@ -137,6 +145,11 @@ public class ShardManager extends AbstractUntypedActor { } } + /** + * Notifies all the local shards of a change in the schema context + * + * @param message + */ private void updateSchemaContext(Object message) { for(ShardInformation info : localShards.values()){ info.getActor().tell(message,getSelf()); @@ -180,10 +193,7 @@ public class ShardManager extends AbstractUntypedActor { getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); } - private String - - - getShardActorPath(String shardName, String memberName) { + private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { return address.toString() + "/user/shardmanager-" + this.type + "/" @@ -193,11 +203,23 @@ public class ShardManager extends AbstractUntypedActor { return null; } + /** + * Construct the name of the shard actor given the name of the member on + * which the shard resides and the name of the shard + * + * @param memberName + * @param shardName + * @return + */ private String getShardActorName(String memberName, String shardName){ return memberName + "-shard-" + shardName + "-" + this.type; } - // Create the shards that are local to this member + /** + * Create shards that are local to the member on which the ShardManager + * runs + * + */ private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = @@ -214,6 +236,12 @@ public class ShardManager extends AbstractUntypedActor { } + /** + * Given the name of the shard find the addresses of all it's peers + * + * @param shardName + * @return + */ private Map getPeerAddresses(String shardName){ Map peerAddresses = new HashMap<>(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java new file mode 100644 index 0000000000..67dab7e663 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class EnableNotification { + private final boolean enabled; + + public EnableNotification(boolean enabled) { + this.enabled = enabled; + } + + public boolean isEnabled() { + return enabled; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java new file mode 100644 index 0000000000..c415db6efe --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * FindLocalShard is a message that should be sent to the {@link org.opendaylight.controller.cluster.datastore.ShardManager} + * when we need to find a reference to a LocalShard + */ +public class FindLocalShard { + private final String shardName; + + public FindLocalShard(String shardName) { + this.shardName = shardName; + } + + public String getShardName() { + return shardName; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java new file mode 100644 index 0000000000..feea38f3e3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; + +/** + * LocalShardFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager} + * when it finds a shard with the specified name in it's local shard registry + */ +public class LocalShardFound { + private final ActorRef path; + + public LocalShardFound(ActorRef path) { + this.path = path; + } + + public ActorRef getPath() { + return path; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java new file mode 100644 index 0000000000..f6c6634e1a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * LocalShardNotFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager} + * when it cannot locate a shard in it's local registry with the shardName specified + */ +public class LocalShardNotFound { + private final String shardName; + + /** + * + * @param shardName the name of the shard that could not be found + */ + public LocalShardNotFound(String shardName) { + this.shardName = shardName; + } + + public String getShardName() { + return shardName; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index ac0893da5a..4706c66e25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -18,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -91,6 +93,29 @@ public class ActorContext { return actorSystem.actorSelection(path); } + /** + * Finds a local shard given it's shard name and return it's ActorRef + * + * @param shardName the name of the local shard that needs to be found + * @return a reference to a local shard actor which represents the shard + * specified by the shardName + */ + public ActorRef findLocalShard(String shardName) { + Object result = executeLocalOperation(shardManager, + new FindLocalShard(shardName), ASK_DURATION); + + if (result instanceof LocalShardFound) { + LocalShardFound found = (LocalShardFound) result; + + LOG.debug("Local shard found {}", found.getPath()); + + return found.getPath(); + } + + return null; + } + + public String findPrimaryPath(String shardName) { Object result = executeLocalOperation(shardManager, new FindPrimary(shardName).toSerializable(), ASK_DURATION); @@ -170,6 +195,34 @@ public class ActorContext { return executeRemoteOperation(primary, message, duration); } + /** + * Execute an operation on the the local shard only + *

      + * This method first finds the address of the local shard if any. It then + * executes the operation on it. + *

      + * + * @param shardName the name of the shard on which the operation needs to be executed + * @param message the message that needs to be sent to the shard + * @param duration the time duration in which this operation should complete + * @return the message that was returned by the local actor on which the + * the operation was executed. If a local shard was not found then + * null is returned + * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException + * if the operation does not complete in a specified time duration + */ + public Object executeLocalShardOperation(String shardName, Object message, + FiniteDuration duration) { + ActorRef local = findLocalShard(shardName); + + if(local != null) { + return executeLocalOperation(local, message, duration); + } + + return null; + } + + public void shutdown() { shardManager.tell(PoisonPill.getInstance(), null); actorSystem.shutdown(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index fd61032220..c4ec8b45fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -86,24 +87,28 @@ public class DataChangeListenerTest extends AbstractActorTest { } @Test - public void testDataChanged(){ + public void testDataChangedWhenNotificationsAreEnabled(){ new JavaTestKit(getSystem()) {{ final MockDataChangeListener listener = new MockDataChangeListener(); final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH ); final ActorRef subject = - getSystem().actorOf(props, "testDataChanged"); + getSystem().actorOf(props, "testDataChangedNotificationsEnabled"); new Within(duration("1 seconds")) { protected void run() { + // Let the DataChangeListener know that notifications should + // be enabled + subject.tell(new EnableNotification(true), getRef()); + subject.tell( new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(), getRef()); - final Boolean out = new ExpectMsg("dataChanged") { + final Boolean out = new ExpectMsg(duration("800 millis"), "dataChanged") { // do not put code outside this method, will run afterwards protected Boolean match(Object in) { - if (in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) { + if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) { return true; } else { @@ -115,7 +120,30 @@ public class DataChangeListenerTest extends AbstractActorTest { assertTrue(out); assertTrue(listener.gotIt()); assertNotNull(listener.getChange().getCreatedData()); - // Will wait for the rest of the 3 seconds + + expectNoMsg(); + } + + + }; + }}; + } + + @Test + public void testDataChangedWhenNotificationsAreDisabled(){ + new JavaTestKit(getSystem()) {{ + final MockDataChangeListener listener = new MockDataChangeListener(); + final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH ); + final ActorRef subject = + getSystem().actorOf(props, "testDataChangedNotificationsDisabled"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(), + getRef()); + expectNoMsg(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 23a1ed4931..03191f70f1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -54,8 +54,8 @@ public class DistributedDataStoreTest extends AbstractActorTest{ } @org.junit.Test - public void testRegisterChangeListener() throws Exception { - mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable()); + public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception { + ListenerRegistration registration = distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { @Override @@ -64,9 +64,31 @@ public class DistributedDataStoreTest extends AbstractActorTest{ } }, AsyncDataBroker.DataChangeScope.BASE); + // Since we do not expect the shard to be local registration will return a NoOpRegistration + Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration); + + Assert.assertNotNull(registration); + } + + @org.junit.Test + public void testRegisterChangeListenerWhenShardIsLocal() throws Exception { + + mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable()); + + ListenerRegistration registration = + distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { + @Override + public void onDataChanged(AsyncDataChangeEvent> change) { + throw new UnsupportedOperationException("onDataChanged"); + } + }, AsyncDataBroker.DataChangeScope.BASE); + + Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy); + Assert.assertNotNull(registration); } + @org.junit.Test public void testCreateTransactionChain() throws Exception { final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 87d257a3f2..268ed3c273 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,5 +1,6 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; @@ -8,13 +9,19 @@ import junit.framework.Assert; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import scala.concurrent.duration.Duration; +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class ShardManagerTest { private static ActorSystem system; @@ -47,7 +54,6 @@ public class ShardManagerTest { expectMsgEquals(Duration.Zero(), new PrimaryNotFound("inventory").toSerializable()); - // Will wait for the rest of the 3 seconds expectNoMsg(); } }; @@ -64,7 +70,6 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); - // the run() method needs to finish within 3 seconds new Within(duration("1 seconds")) { protected void run() { @@ -78,6 +83,75 @@ public class ShardManagerTest { }}; } + @Test + public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + + new JavaTestKit(system) {{ + final Props props = ShardManager + .props("config", new MockClusterWrapper(), + new MockConfiguration()); + final TestActorRef subject = + TestActorRef.create(system, props); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new FindLocalShard("inventory"), getRef()); + + final String out = new ExpectMsg(duration("1 seconds"), "find local") { + protected String match(Object in) { + if (in instanceof LocalShardNotFound) { + return ((LocalShardNotFound) in).getShardName(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("inventory", out); + + expectNoMsg(); + } + }; + }}; + } + + @Test + public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + + final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper(); + + new JavaTestKit(system) {{ + final Props props = ShardManager + .props("config", mockClusterWrapper, + new MockConfiguration()); + final TestActorRef subject = + TestActorRef.create(system, props); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); + + final ActorRef out = new ExpectMsg(duration("1 seconds"), "find local") { + protected ActorRef match(Object in) { + if (in instanceof LocalShardFound) { + return ((LocalShardFound) in).getPath(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config")); + + + expectNoMsg(); + } + }; + }}; + } + @Test public void testOnReceiveMemberUp() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 7d57ea8284..38920d86ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -7,6 +7,7 @@ import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -24,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static junit.framework.Assert.assertFalse; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -92,6 +94,19 @@ public class ShardTest extends AbstractActorTest { getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(), getRef()); + final Boolean notificationEnabled = new ExpectMsg("enable notification") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if(in instanceof EnableNotification){ + return ((EnableNotification) in).isEnabled(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertFalse(notificationEnabled); + final String out = new ExpectMsg("match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { @@ -108,8 +123,6 @@ public class ShardTest extends AbstractActorTest { assertTrue(out.matches( "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); - // Will wait for the rest of the 3 seconds - expectNoMsg(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 3dd0214e9b..5874eccda4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,12 +2,20 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import akka.testkit.JavaTestKit; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; public class ActorContextTest extends AbstractActorTest{ @@ -44,4 +52,145 @@ public class ActorContextTest extends AbstractActorTest{ System.out.println(actorContext .actorFor("akka://system/user/shardmanager/shard/transaction")); } + + + private static class MockShardManager extends UntypedActor { + + private final boolean found; + private final ActorRef actorRef; + + private MockShardManager(boolean found, ActorRef actorRef){ + + this.found = found; + this.actorRef = actorRef; + } + + @Override public void onReceive(Object message) throws Exception { + if(found){ + getSender().tell(new LocalShardFound(actorRef), getSelf()); + } else { + getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf()); + } + } + + private static Props props(final boolean found, final ActorRef actorRef){ + return Props.create(new Creator() { + + @Override public MockShardManager create() + throws Exception { + return new MockShardManager(found, + actorRef); + } + }); + } + } + + @Test + public void testExecuteLocalShardOperationWithShardFound(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds")); + + assertEquals("hello", out); + + + expectNoMsg(); + } + }; + }}; + + } + + @Test + public void testExecuteLocalShardOperationWithShardNotFound(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(false, null)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds")); + + assertNull(out); + + + expectNoMsg(); + } + }; + }}; + + } + + + @Test + public void testFindLocalShardWithShardFound(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + Object out = actorContext.findLocalShard("default"); + + assertEquals(shardActorRef, out); + + + expectNoMsg(); + } + }; + }}; + + } + + @Test + public void testFindLocalShardWithShardNotFound(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(false, null)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + Object out = actorContext.findLocalShard("default"); + + assertNull(out); + + + expectNoMsg(); + } + }; + }}; + + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java new file mode 100644 index 0000000000..fe88afe1db --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.UntypedActor; + +/** + * The EchoActor simply responds back with the same message that it receives + */ +public class EchoActor extends UntypedActor{ + + @Override public void onReceive(Object message) throws Exception { + getSender().tell(message, getSelf()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 1d1e661488..5d3853f311 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -19,6 +19,7 @@ public class MockActorContext extends ActorContext { private Object executeShardOperationResponse; private Object executeRemoteOperationResponse; private Object executeLocalOperationResponse; + private Object executeLocalShardOperationResponse; public MockActorContext(ActorSystem actorSystem) { super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration()); @@ -56,8 +57,18 @@ public class MockActorContext extends ActorContext { this.executeLocalOperationResponse = executeLocalOperationResponse; } + public void setExecuteLocalShardOperationResponse( + Object executeLocalShardOperationResponse) { + this.executeLocalShardOperationResponse = executeLocalShardOperationResponse; + } + @Override public Object executeLocalOperation(ActorRef actor, Object message, FiniteDuration duration) { return this.executeLocalOperationResponse; } + + @Override public Object executeLocalShardOperation(String shardName, + Object message, FiniteDuration duration) { + return this.executeLocalShardOperationResponse; + } } -- 2.36.6