import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@Override
public void onReceive(Object message) throws Exception {
- throw new UnsupportedOperationException("onReceive");
+ if(message instanceof CloseListenerRegistration){
+ closeListenerRegistration((CloseListenerRegistration) message);
+ }
}
public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
}
});
}
+
+ private void closeListenerRegistration(CloseListenerRegistration message){
+ registration.close();
+ getSender().tell(new CloseListenerRegistrationReply(), getSelf());
+ }
}
* A Shard represents a portion of the logical data tree
* <p/>
* Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ *
*/
public class Shard extends UntypedProcessor {
} else if(message instanceof RegisterChangeListener){
registerChangeListener((RegisterChangeListener) message);
} else if(message instanceof UpdateSchemaContext){
- store.onGlobalContextUpdated(((UpdateSchemaContext) message).getSchemaContext());
+ updateSchemaContext((UpdateSchemaContext) message);
}
}
+ private void updateSchemaContext(UpdateSchemaContext message) {
+ store.onGlobalContextUpdated(message.getSchemaContext());
+ }
+
private void registerChangeListener(RegisterChangeListener registerChangeListener) {
org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
- // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
- ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+ ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistration {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistrationReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * The DefaultShardStrategy basically puts all data into the default Shard
+ * <p>
+ * The default shard stores data for all modules for which a specific set of shards has not been configured
+ * </p>
+ */
+public class DefaultShardStrategy implements ShardStrategy{
+
+ public static final String NAME = "default";
+ public static final String DEFAULT_SHARD = "default";
+
+ @Override
+ public String findShard(InstanceIdentifier path) {
+ return DEFAULT_SHARD;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * The role of ShardStrategy is to figure out which Shards a given piece of data belongs to
+ */
+public interface ShardStrategy {
+ /**
+ * Find the name of the shard in which the data pointed to by the specified path belongs in
+ *
+ * @param path The location of the data in the logical tree
+ * @return
+ */
+ String findShard(InstanceIdentifier path);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ShardStrategyFactory {
+ private static final Map<String, ShardStrategy> moduleNameToStrategyMap = new ConcurrentHashMap();
+
+ private static final String UNKNOWN_MODULE_NAME = "unknown";
+
+ public static ShardStrategy getStrategy(InstanceIdentifier path){
+ Preconditions.checkNotNull(path, "path should not be null");
+
+ String moduleName = getModuleName(path);
+ ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
+ if(shardStrategy == null){
+ return new DefaultShardStrategy();
+ }
+
+ return shardStrategy;
+ }
+
+
+ private static String getModuleName(InstanceIdentifier path){
+ return UNKNOWN_MODULE_NAME;
+ }
+
+ /**
+ * This is to be used in the future to register a custom shard strategy
+ *
+ * @param moduleName
+ * @param shardStrategy
+ */
+ public static void registerShardStrategy(String moduleName, ShardStrategy shardStrategy){
+ throw new UnsupportedOperationException("registering a custom shard strategy not supported yet");
+ }
+}
import org.junit.AfterClass;
import org.junit.BeforeClass;
-public class AbstractActorTest {
+public abstract class AbstractActorTest {
private static ActorSystem system;
@BeforeClass
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import static org.junit.Assert.assertEquals;
+
+public class ListenerRegistrationTest extends AbstractActorTest {
+ private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+ static {
+ store.onGlobalContextUpdated(TestModel.createTestContext());
+ }
+
+
+ @Test
+ public void testOnReceiveCloseListenerRegistration() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ListenerRegistration.props(store.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
+ final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new CloseListenerRegistration(), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof CloseListenerRegistrationReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ private AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+ return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+ }
+ };
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+public class DefaultShardStrategyTest {
+
+ @Test
+ public void testFindShard() throws Exception {
+ String shard = new DefaultShardStrategy().findShard(TestModel.TEST_PATH);
+ Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+import static junit.framework.Assert.assertNotNull;
+
+public class ShardStrategyFactoryTest {
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ @Test
+ public void testGetStrategy(){
+ ShardStrategy strategy = ShardStrategyFactory.getStrategy(TestModel.TEST_PATH);
+ assertNotNull(strategy);
+ }
+
+ @Test
+ public void testGetStrategyNullPointerExceptionWhenPathIsNull(){
+ expectedEx.expect(NullPointerException.class);
+ expectedEx.expectMessage("path should not be null");
+
+ ShardStrategyFactory.getStrategy(null);
+ }
+
+}
\ No newline at end of file