package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+
import java.util.List;
+import java.util.Map;
public interface Configuration {
List<String> getMemberShardNames(String memberName);
+ Optional<String> getModuleNameFromNameSpace(String nameSpace);
+ Map<String, ShardStrategy> getModuleNameToShardStrategyMap();
+ List<String> getShardNamesFromModuleName(String moduleName);
}
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class ConfigurationImpl implements Configuration {
+
private final List<ModuleShard> moduleShards = new ArrayList<>();
+
private final List<Module> modules = new ArrayList<>();
readModules(modulesConfig);
}
- public List<String> getMemberShardNames(String memberName){
-
+ @Override public List<String> getMemberShardNames(String memberName){
List<String> shards = new ArrayList();
for(ModuleShard ms : moduleShards){
for(Shard s : ms.getShards()){
}
+ @Override public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+ for(Module m : modules){
+ if(m.getNameSpace().equals(nameSpace)){
+ return Optional.of(m.getName());
+ }
+ }
+ return Optional.absent();
+ }
+
+ @Override public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ Map<String, ShardStrategy> map = new HashMap<>();
+ for(Module m : modules){
+ map.put(m.getName(), m.getShardStrategy());
+ }
+ return map;
+ }
+
+ @Override public List<String> getShardNamesFromModuleName(String moduleName) {
+ for(ModuleShard m : moduleShards){
+ if(m.getModuleName().equals(moduleName)){
+ List<String> l = new ArrayList<>();
+ for(Shard s : m.getShards()){
+ l.add(s.getName());
+ }
+ return l;
+ }
+ }
+
+ return Collections.EMPTY_LIST;
+ }
+
+
private void readModules(Config modulesConfig) {
List<? extends ConfigObject> modulesConfigObjectList =
for(ConfigObject o : modulesConfigObjectList){
ConfigObjectWrapper w = new ConfigObjectWrapper(o);
modules.add(new Module(w.stringValue("name"), w.stringValue(
- "namespace"), w.stringValue("sharding-strategy")));
+ "namespace"), w.stringValue("shard-strategy")));
}
}
}
- public static class ModuleShard {
+ private class ModuleShard {
private final String moduleName;
private final List<Shard> shards;
}
}
- public static class Shard {
+ private class Shard {
private final String name;
private final List<String> replicas;
}
}
- public static class Module {
+ private class Module {
private final String name;
private final String nameSpace;
- private final String shardingStrategy;
+ private final ShardStrategy shardStrategy;
- Module(String name, String nameSpace, String shardingStrategy) {
+ Module(String name, String nameSpace, String shardStrategy) {
this.name = name;
this.nameSpace = nameSpace;
- this.shardingStrategy = shardingStrategy;
+ if(ModuleShardStrategy.NAME.equals(shardStrategy)){
+ this.shardStrategy = new ModuleShardStrategy(name, ConfigurationImpl.this);
+ } else {
+ this.shardStrategy = new DefaultShardStrategy();
+ }
}
public String getName() {
return nameSpace;
}
- public String getShardingStrategy() {
- return shardingStrategy;
+ public ShardStrategy getShardStrategy() {
+ return shardStrategy;
}
}
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataChangeListener.props(listener));
- Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
+ String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
+
+ Object result = actorContext.executeShardOperation(shardName,
new RegisterChangeListener(path, dataChangeListenerActor.path(),
AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
ActorContext.ASK_DURATION
FindPrimary msg = ((FindPrimary) message);
String shardName = msg.getShardName();
-
if (Shard.DEFAULT_NAME.equals(shardName)) {
ActorPath defaultShardPath = localShards.get(shardName);
if(defaultShardPath == null){
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
this.executor = executor;
this.schemaContext = schemaContext;
- Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
- if(response instanceof CreateTransactionReply){
- CreateTransactionReply reply = (CreateTransactionReply) response;
- remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionActorPath()));
- }
+
}
@Override
public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+
+ createTransactionIfMissing(actorContext, path);
+
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
@Override
public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+
+ createTransactionIfMissing(actorContext, path);
+
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
remoteTransaction.tell(new WriteData(path, data, schemaContext).toSerializable(), null);
}
@Override
public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+
+ createTransactionIfMissing(actorContext, path);
+
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
remoteTransaction.tell(new MergeData(path, data, schemaContext).toSerializable(), null);
}
@Override
public void delete(InstanceIdentifier path) {
+
+ createTransactionIfMissing(actorContext, path);
+
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
remoteTransaction.tell(new DeleteData(path).toSerializable(), null);
}
}
private String shardNameFromIdentifier(InstanceIdentifier path){
- return Shard.DEFAULT_NAME;
+ return ShardStrategyFactory.getStrategy(path).findShard(path);
}
+
+ private void createTransactionIfMissing(ActorContext actorContext, InstanceIdentifier path) {
+ String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
+
+ ActorSelection actorSelection =
+ remoteTransactionPaths.get(shardName);
+
+ if(actorSelection != null){
+ // A transaction already exists with that shard
+ return;
+ }
+
+ Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
+ if(response instanceof CreateTransactionReply){
+ CreateTransactionReply reply = (CreateTransactionReply) response;
+ remoteTransactionPaths.put(shardName, actorContext.actorSelection(reply.getTransactionActorPath()));
+ }
+ }
+
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class ModuleShardStrategy implements ShardStrategy {
+
+ public static final String NAME = "module";
+
+ private final String moduleName;
+ private final Configuration configuration;
+
+ public ModuleShardStrategy(String moduleName, Configuration configuration){
+ this.moduleName = moduleName;
+
+ this.configuration = configuration;
+ }
+
+ @Override public String findShard(InstanceIdentifier path) {
+ return configuration.getShardNamesFromModuleName(moduleName).get(0);
+ }
+}
package org.opendaylight.controller.cluster.datastore.shardstrategy;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ShardStrategyFactory {
- private static final Map<String, ShardStrategy> moduleNameToStrategyMap = new ConcurrentHashMap();
+ private static Map<String, ShardStrategy> moduleNameToStrategyMap =
+ new ConcurrentHashMap();
- private static final String UNKNOWN_MODULE_NAME = "unknown";
+ private static final String UNKNOWN_MODULE_NAME = "unknown";
+ private static Configuration configuration;
- public static ShardStrategy getStrategy(InstanceIdentifier path){
- Preconditions.checkNotNull(path, "path should not be null");
- String moduleName = getModuleName(path);
- ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
- if(shardStrategy == null){
- return new DefaultShardStrategy();
+ public static void setConfiguration(Configuration configuration){
+ ShardStrategyFactory.configuration = configuration;
+ moduleNameToStrategyMap = configuration.getModuleNameToShardStrategyMap();
}
- return shardStrategy;
- }
+ public static ShardStrategy getStrategy(InstanceIdentifier path) {
+ Preconditions.checkState(configuration != null, "configuration should not be missing");
+ Preconditions.checkNotNull(path, "path should not be null");
- private static String getModuleName(InstanceIdentifier path){
- return UNKNOWN_MODULE_NAME;
- }
+ String moduleName = getModuleName(path);
+ ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
+ if (shardStrategy == null) {
+ return new DefaultShardStrategy();
+ }
- /**
- * This is to be used in the future to register a custom shard strategy
- *
- * @param moduleName
- * @param shardStrategy
- */
- public static void registerShardStrategy(String moduleName, ShardStrategy shardStrategy){
- throw new UnsupportedOperationException("registering a custom shard strategy not supported yet");
- }
+ return shardStrategy;
+ }
+
+
+ private static String getModuleName(InstanceIdentifier path) {
+ String namespace = path.getLastPathArgument().getNodeType().getNamespace()
+ .toASCIIString();
+
+ Optional<String> optional =
+ configuration.getModuleNameFromNameSpace(namespace);
+
+ if(!optional.isPresent()){
+ return UNKNOWN_MODULE_NAME;
+ }
+
+ return optional.get();
+ }
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@Test
public void integrationTest() throws Exception {
+ ShardStrategyFactory.setConfiguration(new MockConfiguration());
DistributedDataStore distributedDataStore =
new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), new MockConfiguration());
import junit.framework.Assert;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
@org.junit.Before
public void setUp() throws Exception {
+ ShardStrategyFactory.setConfiguration(new MockConfiguration());
final Props props = Props.create(DoNothingActor.class);
doNothingActorRef = getSystem().actorOf(props);
TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ transactionProxy.read(TestModel.TEST_PATH);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
new TransactionProxy(actorContext,
TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ transactionProxy.read(TestModel.TEST_PATH);
+
transactionProxy.close();
Object messages = testContext
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import junit.framework.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.ConfigurationImpl;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+
+public class ModuleShardStrategyTest {
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ private static Configuration configuration;
+
+ @BeforeClass
+ public static void setUpClass(){
+ configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+ }
+
+
+ @Test
+ public void testFindShard() throws Exception {
+ ModuleShardStrategy moduleShardStrategy =
+ new ModuleShardStrategy("cars", configuration);
+
+ String shard = moduleShardStrategy.findShard(CarsModel.BASE_PATH);
+
+ Assert.assertEquals("cars-1", shard);
+ }
+}
package org.opendaylight.controller.cluster.datastore.shardstrategy;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.opendaylight.controller.cluster.datastore.ConfigurationImpl;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
public class ShardStrategyFactoryTest {
- @Rule
- public ExpectedException expectedEx = ExpectedException.none();
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
- @Test
- public void testGetStrategy(){
- ShardStrategy strategy = ShardStrategyFactory.getStrategy(TestModel.TEST_PATH);
- assertNotNull(strategy);
- }
+ @BeforeClass
+ public static void setUpClass(){
+ ShardStrategyFactory.setConfiguration(new ConfigurationImpl("module-shards.conf", "modules.conf"));
+ }
- @Test
- public void testGetStrategyNullPointerExceptionWhenPathIsNull(){
- expectedEx.expect(NullPointerException.class);
- expectedEx.expectMessage("path should not be null");
+ @Test
+ public void testGetStrategy() {
+ ShardStrategy strategy =
+ ShardStrategyFactory.getStrategy(TestModel.TEST_PATH);
+ assertNotNull(strategy);
+ }
- ShardStrategyFactory.getStrategy(null);
- }
+ @Test
+ public void testGetStrategyForKnownModuleName() {
+ ShardStrategy strategy =
+ ShardStrategyFactory.getStrategy(InstanceIdentifier.of(CarsModel.BASE_QNAME));
+ assertTrue(strategy instanceof ModuleShardStrategy);
+ }
-}
\ No newline at end of file
+
+ @Test
+ public void testGetStrategyNullPointerExceptionWhenPathIsNull() {
+ expectedEx.expect(NullPointerException.class);
+ expectedEx.expectMessage("path should not be null");
+
+ ShardStrategyFactory.getStrategy(null);
+ }
+
+}
package org.opendaylight.controller.cluster.datastore.utils;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
public class MockConfiguration implements Configuration{
@Override public List<String> getMemberShardNames(String memberName) {
shardNames.add("default");
return shardNames;
}
+
+ @Override public Optional<String> getModuleNameFromNameSpace(
+ String nameSpace) {
+ return Optional.absent();
+ }
+
+ @Override
+ public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ return Collections.EMPTY_MAP;
+ }
+
+ @Override public List<String> getShardNamesFromModuleName(
+ String moduleName) {
+ return Collections.EMPTY_LIST;
+ }
}
public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13",
"cars");
+ public static final InstanceIdentifier BASE_PATH = InstanceIdentifier.of(BASE_QNAME);
+
public static final QName CARS_QNAME = QName.create(BASE_QNAME, "cars");
public static final QName CAR_QNAME = QName.create(CARS_QNAME, "car");
public static final QName CAR_NAME_QNAME = QName.create(CAR_QNAME, "name");
{
name = "people"
namespace = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people"
- sharding-strategy = "module"
+ shard-strategy = "module"
},
{
name = "cars"
namespace = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars"
- sharding-strategy = "module"
+ shard-strategy = "module"
}
]