Merge "Fix usage of StringBuilder when getting parent path in PathUtils Making consta...
authorMoiz Raja <moraja@cisco.com>
Sat, 9 Aug 2014 02:05:52 +0000 (02:05 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 9 Aug 2014 02:05:52 +0000 (02:05 +0000)
133 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/config/config-manager/src/main/java/org/opendaylight/controller/config/manager/impl/osgi/ConfigManagerActivator.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/impl/runtimembean/RuntimeBeanRegistratorImplTest.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/testingservices/parallelapsp/test/DependentWiringTest.java
opendaylight/config/yang-jmx-generator/pom.xml
opendaylight/config/yang-jmx-generator/src/test/java/org/opendaylight/controller/config/yangjmxgenerator/RuntimeBeanEntryTest.java
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/NodeMappingTest.java
opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/ForwardedBackwardsCompatibleDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataTransactionImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/MountPointManagerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/MockSchemaService.java
opendaylight/md-sal/sal-clustering-config/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/module-shards.conf [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/modules.conf [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/modules.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/modules.conf
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementationUnavailableException.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/mount/DOMMountPointServiceImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPointManager.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/DataTransactionImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RoutedRpcSelector.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/util/NetconfMessageTransformUtil.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/test/resources/netconf-state.schemas.payload.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java with 97% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/Activator.java
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/NeutronLoadBalancerHealthMonitorInterface.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/NeutronLoadBalancerInterface.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/NeutronLoadBalancerListenerInterface.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/NeutronLoadBalancerPoolInterface.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/implementation/src/main/java/org/opendaylight/controller/networkconfig/neutron/implementation/NeutronLoadBalancerPoolMemberInterface.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerAware.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerCRUD.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerHealthMonitorAware.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerHealthMonitorCRUD.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerListenerAware.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerListenerCRUD.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolAware.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolCRUD.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberAware.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberCRUD.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberRequest.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronCRUDInterfaces.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronLoadBalancer.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronLoadBalancerHealthMonitor.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronLoadBalancerListener.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronLoadBalancerPool.java [new file with mode: 0644]
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronLoadBalancerPoolMember.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/INeutronLoadBalancerPoolMemberRequest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerHealthMonitorNorthbound.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerHealthMonitorRequest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerListenerNorthbound.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerListenerRequest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerNorthbound.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerPoolMembersNorthbound.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerPoolNorthbound.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerPoolRequest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerRequest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronNorthboundRSApplication.java

index 6b17011..8797c7e 100644 (file)
     <jsr305.api.version>2.0.1</jsr305.api.version>
     <jsr311.api.version>1.1.1</jsr311.api.version>
     <jsr311.v2.api.version>2.0</jsr311.v2.api.version>
-    <junit.version>4.8.1</junit.version>
     <karaf.branding.version>1.0.0-SNAPSHOT</karaf.branding.version>
     <karaf.shell.version>3.0.0</karaf.shell.version>
     <karaf.version>3.0.1</karaf.version>
             <artifactId>akka-osgi_${scala.version}</artifactId>
             <version>${akka.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-slf4j_${scala.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
       <dependency>
         <groupId>commons-codec</groupId>
         <artifactId>commons-codec</artifactId>
             <artifactId>sal-clustering-commons</artifactId>
             <version>${mdsal.version}</version>
         </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-clustering-config</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>${slf4j.version}</version>
       </dependency>
-      <dependency>
-        <groupId>junit</groupId>
-        <artifactId>junit</artifactId>
-        <version>${junit.version}</version>
-        <scope>test</scope>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>commons.logback_settings</artifactId>
index 6381836..828fcb0 100644 (file)
@@ -7,6 +7,14 @@
  */
 package org.opendaylight.controller.config.manager.impl.osgi;
 
+import static org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil.registerService;
+import static org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil.wrap;
+
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.List;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanServer;
 import org.opendaylight.controller.config.manager.impl.ConfigRegistryImpl;
 import org.opendaylight.controller.config.manager.impl.jmx.ConfigRegistryJMXRegistrator;
 import org.opendaylight.controller.config.manager.impl.osgi.mapping.CodecRegistryProvider;
@@ -14,27 +22,19 @@ import org.opendaylight.controller.config.manager.impl.osgi.mapping.ModuleInfoBu
 import org.opendaylight.controller.config.manager.impl.osgi.mapping.RefreshingSCPModuleInfoRegistry;
 import org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil;
 import org.opendaylight.controller.config.spi.ModuleFactory;
+import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.util.tracker.ServiceTracker;
 
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.MBeanServer;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil.registerService;
-import static org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil.wrap;
-
 public class ConfigManagerActivator implements BundleActivator {
     private final MBeanServer configMBeanServer = ManagementFactory.getPlatformMBeanServer();
 
     private AutoCloseable autoCloseable;
 
     @Override
-    public void start(BundleContext context) {
+    public void start(final BundleContext context) {
 
         ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();// the inner strategy is backed by thread context cl?
 
@@ -63,6 +63,7 @@ public class ConfigManagerActivator implements BundleActivator {
         bundleTracker.open();
 
         // register config registry to OSGi
+        AutoCloseable clsReg = registerService(context, moduleInfoBackedContext, GeneratedClassLoadingStrategy.class);
         AutoCloseable configRegReg = registerService(context, configRegistry, ConfigRegistryImpl.class);
 
         // register config registry to jmx
@@ -79,12 +80,12 @@ public class ConfigManagerActivator implements BundleActivator {
         serviceTracker.open();
 
         List<AutoCloseable> list = Arrays.asList(
-                codecRegistryProvider, configRegistry, wrap(bundleTracker), configRegReg, configRegistryJMXRegistrator, wrap(serviceTracker));
+                codecRegistryProvider, clsReg,configRegistry, wrap(bundleTracker), configRegReg, configRegistryJMXRegistrator, wrap(serviceTracker));
         autoCloseable = OsgiRegistrationUtil.aggregate(list);
     }
 
     @Override
-    public void stop(BundleContext context) throws Exception {
+    public void stop(final BundleContext context) throws Exception {
         autoCloseable.close();
     }
 }
index ce3648d..16de005 100644 (file)
@@ -7,10 +7,12 @@
  */
 package org.opendaylight.controller.config.manager.impl.runtimembean;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.junit.internal.matchers.StringContains.containsString;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 import java.lang.management.ManagementFactory;
 import java.util.Map;
@@ -29,9 +31,6 @@ import org.opendaylight.controller.config.manager.impl.jmx.BaseJMXRegistrator;
 import org.opendaylight.controller.config.manager.impl.jmx.HierarchicalRuntimeBeanRegistrationImpl;
 import org.opendaylight.controller.config.manager.impl.jmx.RootRuntimeBeanRegistratorImpl;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
 public class RuntimeBeanRegistratorImplTest extends
         AbstractLockedPlatformMBeanServerTest {
     static final String module1 = "module1";
@@ -60,11 +59,11 @@ public class RuntimeBeanRegistratorImplTest extends
         assertEquals(0, baseJMXRegistrator.getRegisteredObjectNames().size());
     }
 
-    protected void checkExists(ObjectName on) throws Exception {
+    protected void checkExists(final ObjectName on) throws Exception {
         platformMBeanServer.getMBeanInfo(on);
     }
 
-    protected void checkNotExists(ObjectName on) throws Exception {
+    protected void checkNotExists(final ObjectName on) throws Exception {
         try {
             platformMBeanServer.getMBeanInfo(on);
             fail();
@@ -98,7 +97,7 @@ public class RuntimeBeanRegistratorImplTest extends
     }
 
     private HierarchicalRuntimeBeanRegistration createAdditional(
-            HierarchicalRuntimeBeanRegistrationImpl rootRegistration)
+            final HierarchicalRuntimeBeanRegistrationImpl rootRegistration)
             throws Exception {
 
         HierarchicalRuntimeBeanRegistrationImpl registration = rootRegistration
index c9810d0..165a6c7 100644 (file)
@@ -7,14 +7,16 @@
  */
 package org.opendaylight.controller.config.manager.testingservices.parallelapsp.test;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.internal.matchers.StringContains.containsString;
 
 import java.util.Map;
+
 import javax.management.ObjectName;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
index fbae840..133b07f 100644 (file)
       <artifactId>yang-parser-impl</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-       <groupId>org.hamcrest</groupId>
-       <artifactId>hamcrest-core</artifactId>
-       <version>1.1</version>
-       <scope>test</scope>
-     </dependency>
   </dependencies>
 
   <build>
index b570302..e80ebc6 100644 (file)
@@ -7,7 +7,22 @@
  */
 package org.opendaylight.controller.config.yangjmxgenerator;
 
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.openmbean.SimpleType;
+
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.config.yangjmxgenerator.attribute.JavaAttribute;
@@ -18,20 +33,6 @@ import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
 
-import javax.management.openmbean.SimpleType;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static org.hamcrest.CoreMatchers.is;
-
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doReturn;
-
 public class RuntimeBeanEntryTest extends AbstractYangTest {
 
     public static final String PACKAGE_NAME = "packages.sis";
@@ -54,10 +55,10 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
         Map<String, RuntimeBeanEntry> runtimeBeans = RuntimeBeanEntry
                 .extractClassNameToRuntimeBeanMap(PACKAGE_NAME, caseNode, "test-name", new TypeProviderWrapper(new
                         TypeProviderImpl(context)), "test", jmxImplModule);
-        assertThat(runtimeBeans.size(), is(1));
+        assertEquals(1, runtimeBeans.size());
         RuntimeBeanEntry runtimeMXBean = runtimeBeans.get("testRuntimeMXBean");
-        assertThat(runtimeMXBean.isRoot(), is(true));
-        assertThat(runtimeMXBean.getYangName(), is("test-name"));
+        assertTrue(runtimeMXBean.isRoot());
+        assertEquals("test-name", runtimeMXBean.getYangName());
     }
 
     @Test
@@ -72,7 +73,7 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
                 threadsJavaModule, modulesToSIEs, context,
                 new TypeProviderWrapper(new TypeProviderImpl(context)),
                 PACKAGE_NAME);
-        assertThat(namesToMBEs.isEmpty(), is(false));
+        assertFalse(namesToMBEs.isEmpty());
 
         // get threadfactory-naming bean
         ModuleMXBeanEntry threadfactoryNamingMXBean = namesToMBEs
@@ -82,13 +83,13 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
         // get runtime beans
         Collection<RuntimeBeanEntry> runtimeBeanEntries = threadfactoryNamingMXBean
                 .getRuntimeBeans();
-        assertThat(runtimeBeanEntries.isEmpty(), is(false));
+        assertFalse(runtimeBeanEntries.isEmpty());
 
         // get root runtime bean
         RuntimeBeanEntry threadfactoryRuntimeBeanEntry = getRuntimeBeanEntryByJavaName(
                 runtimeBeanEntries, "NamingThreadFactoryRuntimeMXBean");
         assertNotNull(threadfactoryRuntimeBeanEntry);
-        assertThat(threadfactoryRuntimeBeanEntry.isRoot(), is(true));
+        assertTrue(threadfactoryRuntimeBeanEntry.isRoot());
 
         // get thread runtime bean
         RuntimeBeanEntry runtimeBeanEntry = getRuntimeBeanEntryByJavaName(
@@ -96,43 +97,41 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
         assertNotNull(runtimeBeanEntry);
 
         // test thread runtime bean properties
-        assertThat(runtimeBeanEntry.getJavaNamePrefix(),
-                is(THREAD_RUNTIME_BEAN_JAVA_PREFIX));
-        assertThat(runtimeBeanEntry.getPackageName(), is(PACKAGE_NAME));
-        assertThat(runtimeBeanEntry.getFullyQualifiedName(runtimeBeanEntry
-                .getJavaNameOfRuntimeMXBean()), is(PACKAGE_NAME + "."
-                + THREAD_RUNTIME_BEAN_JAVA_NAME));
-        assertThat(runtimeBeanEntry.getYangName(),
-                is(THREAD_RUNTIME_BEAN_YANG_NAME));
+        assertEquals(THREAD_RUNTIME_BEAN_JAVA_PREFIX, runtimeBeanEntry.getJavaNamePrefix());
+        assertEquals(PACKAGE_NAME, runtimeBeanEntry.getPackageName());
+        assertEquals(PACKAGE_NAME + "." + THREAD_RUNTIME_BEAN_JAVA_NAME,
+            runtimeBeanEntry.getFullyQualifiedName(runtimeBeanEntry
+                .getJavaNameOfRuntimeMXBean()));
+        assertEquals(THREAD_RUNTIME_BEAN_YANG_NAME, runtimeBeanEntry.getYangName());
 
         // get thread runtime bean rpcs
         List<RuntimeBeanEntry.Rpc> rpcs = new ArrayList<RuntimeBeanEntry.Rpc>(
                 runtimeBeanEntry.getRpcs());
-        assertThat(rpcs.size(), is(2));
+        assertEquals(2, rpcs.size());
 
         // get sleep rpc and test it
         RuntimeBeanEntry.Rpc rpc = getRpcByName(rpcs, SLEEP_RPC_NAME);
         assertNotNull(rpc);
-        assertThat(rpc.getYangName(), is(SLEEP_RPC_NAME));
+        assertEquals(SLEEP_RPC_NAME, rpc.getYangName());
 
-        assertThat(((JavaAttribute)rpc.getReturnType()).getType().getFullyQualifiedName().endsWith(SLEEP_RPC_OUTPUT),  is(true));
+        assertTrue(((JavaAttribute)rpc.getReturnType()).getType().getFullyQualifiedName().endsWith(SLEEP_RPC_OUTPUT));
 
         // get sleep rpc input attribute and test it
         List<JavaAttribute> attributes = rpc.getParameters();
-        assertThat(attributes.size(), is(1));
+        assertEquals(1, attributes.size());
         JavaAttribute attribute = attributes.get(0);
-        assertThat(attribute.getAttributeYangName(), is(SLEEP_RPC_INPUT_NAME));
-        assertThat(attribute.getType().getName(), is(SLEEP_RPC_INPUT_TYPE));
-        assertThat(attribute.getLowerCaseCammelCase(), is(SLEEP_RPC_INPUT_NAME));
-        assertThat(attribute.getUpperCaseCammelCase(), is("Millis"));
+        assertEquals(SLEEP_RPC_INPUT_NAME, attribute.getAttributeYangName());
+        assertEquals(SLEEP_RPC_INPUT_TYPE, attribute.getType().getName());
+        assertEquals(SLEEP_RPC_INPUT_NAME, attribute.getLowerCaseCammelCase());
+        assertEquals("Millis", attribute.getUpperCaseCammelCase());
         assertNull(attribute.getNullableDefault());
         assertNull(attribute.getNullableDescription());
-        assertThat(attribute.getOpenType(), is(SimpleType.class));
+        assertTrue(attribute.getOpenType() instanceof SimpleType);
     }
 
     private RuntimeBeanEntry getRuntimeBeanEntryByJavaName(
             final Collection<RuntimeBeanEntry> runtimeBeanEntries,
-            String javaName) {
+            final String javaName) {
         if (runtimeBeanEntries != null && !runtimeBeanEntries.isEmpty()) {
             for (RuntimeBeanEntry runtimeBeanEntry : runtimeBeanEntries) {
                 if (runtimeBeanEntry.getJavaNameOfRuntimeMXBean().equals(
@@ -145,7 +144,7 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
     }
 
     private RuntimeBeanEntry.Rpc getRpcByName(
-            final List<RuntimeBeanEntry.Rpc> rpcs, String name) {
+            final List<RuntimeBeanEntry.Rpc> rpcs, final String name) {
         if (rpcs != null && !rpcs.isEmpty()) {
             for (RuntimeBeanEntry.Rpc rpc : rpcs) {
                 if (rpc.getName().equals(name)) {
index b02835e..7ab56e6 100644 (file)
             <phase>generate-resources</phase>
             <configuration>
                <outputDirectory>${project.build.directory}/configuration</outputDirectory>
-               <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
-               <includes>**\/*.xml</includes>
+               <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config,sal-clustering-config</includeArtifactIds>
+               <includes>**\/*.xml,**/*.conf</includes>
                <excludeTransitive>true</excludeTransitive>
                <ignorePermissions>false</ignorePermissions>
             </configuration>
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
-          <dependency>
-              <groupId>org.opendaylight.controller</groupId>
-              <artifactId>sal-distributed-datastore</artifactId>
-          </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-distributed-datastore</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-clustering-config</artifactId>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
index 02964c6..be087ab 100644 (file)
@@ -7,11 +7,13 @@
  */
 package org.opendaylight.controller.sal.compatibility;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import java.math.BigInteger;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
-
 import org.opendaylight.controller.sal.common.util.Arguments;
 import org.opendaylight.controller.sal.core.AdvertisedBandwidth;
 import org.opendaylight.controller.sal.core.Bandwidth;
@@ -62,16 +64,14 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
 public final class NodeMapping {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(NodeMapping.class);
 
-    /** openflow id prefix */
+    /**
+     * openflow id prefix
+     */
     public static final String OPENFLOW_ID_PREFIX = "openflow:";
 
     public final static String MD_SAL_TYPE = "MD_SAL_DEPRECATED";
@@ -90,8 +90,14 @@ public final class NodeMapping {
     }
 
     public static org.opendaylight.controller.sal.core.Node toADNode(final NodeId id) throws ConstructionException {
-        Long aDNodeId = openflowFullNodeIdToLong(NodeMapping.toADNodeId(id));
-        return new org.opendaylight.controller.sal.core.Node(NodeIDType.OPENFLOW, aDNodeId);
+        String nodeId = NodeMapping.toADNodeId(id);
+        String nodeIdasNumber = nodeId.replaceFirst("^.*:", "");
+        if (isInteger(nodeIdasNumber)) {
+            Long aDNodeId = openflowFullNodeIdToLong(nodeIdasNumber);
+            return new org.opendaylight.controller.sal.core.Node(NodeIDType.OPENFLOW, aDNodeId);
+        } else {
+            return new org.opendaylight.controller.sal.core.Node(NodeIDType.PRODUCTION, nodeId);
+        }
     }
 
     /**
@@ -103,7 +109,7 @@ public final class NodeMapping {
         if (adNodeId == null) {
             return null;
         }
-        return new BigInteger(adNodeId.replaceFirst("^.*:", "")).longValue();
+        return new BigInteger(adNodeId).longValue();
     }
 
     public static NodeId toNodeId(final InstanceIdentifier<?> id) {
@@ -137,8 +143,8 @@ public final class NodeMapping {
     }
 
     /**
-     * @param ncid nodeConnector identifier, e.g.: OF:21 or CTRL
-     * @param node
+     * @param ncid   nodeConnector identifier, e.g.: OF:21 or CTRL
+     * @param aDNode
      * @return nodeConnector attached to given node
      * @throws ConstructionException
      */
@@ -155,7 +161,7 @@ public final class NodeMapping {
      * @return
      */
     private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
-        return new NodeId(aDNode.getType() + ":" +String.valueOf(aDNode.getID()));
+        return new NodeId(aDNode.getType() + ":" + String.valueOf(aDNode.getID()));
     }
 
     public static String toNodeConnectorType(final NodeConnectorId ncId, final NodeId nodeId) {
@@ -212,7 +218,7 @@ public final class NodeMapping {
     public static NodeRef toNodeRef(final org.opendaylight.controller.sal.core.Node node) {
         Preconditions.checkArgument(NodeIDType.OPENFLOW.equals(node.getType()));
         final Long nodeId = Arguments.<Long>checkInstanceOf(node.getID(), Long.class);
-        final NodeKey nodeKey = new NodeKey(new NodeId(OPENFLOW_ID_PREFIX+nodeId));
+        final NodeKey nodeKey = new NodeKey(new NodeId(OPENFLOW_ID_PREFIX + nodeId));
         final InstanceIdentifier<Node> nodePath = InstanceIdentifier.builder(Nodes.class).child(NODE_CLASS, nodeKey).toInstance();
         return new NodeRef(nodePath);
     }
@@ -257,7 +263,7 @@ public final class NodeMapping {
     }
 
     /**
-     * @param id
+     * @param nodeRef
      * @return node description in AD form, e.g.: OF|00:00:00:...:01
      */
     private static Description toADDescription(NodeRef nodeRef) {
@@ -463,4 +469,17 @@ public final class NodeMapping {
     public static Buffers toADBuffers(final Long buffers) {
         return new Buffers(buffers.intValue());
     }
+
+
+    private static final boolean isInteger(String value) {
+        if (value.isEmpty()) return false;
+        for (int i = 0; i < value.length(); i++) {
+            if (i == 0 && value.charAt(i) == '-') {
+                if (value.length() == 1) return false;
+                else continue;
+            }
+            if (Character.digit(value.charAt(i), 10) < 0) return false;
+        }
+        return true;
+    }
 }
index cef7ae7..a776ef2 100644 (file)
@@ -101,6 +101,16 @@ public class NodeMappingTest {
         } catch (ConstructionException e) {
             Assert.fail("should succeed to construct Node: "+e.getMessage());
         }
+
+        final String nodeUriPrefix = "opendaylight-inventory:nodes/node/";
+        nodeId = new NodeId(nodeUriPrefix + "iosv-2");
+        try {
+            observed = NodeMapping.toADNode(nodeId);
+            Assert.assertEquals("PR|opendaylight-inventory:nodes/node/iosv-2", observed.toString());
+        } catch (ConstructionException e) {
+            Assert.fail("should succeed to construct Node: "+e.getMessage());
+        }
+
     }
 
     /**
index f25b7d9..35a7766 100644 (file)
                 <module>
                     <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
                     <name>distributed-operational-store-module</name>
-                    <schema-service>
+                    <operational-schema-service>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
                         <name>yang-schema-service</name>
-                    </schema-service>
+                    </operational-schema-service>
                 </module>
 
                 <module>
                     <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-config-datastore-provider</type>
                     <name>distributed-config-store-module</name>
-                    <schema-service>
+                    <configschema-service>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
                         <name>yang-schema-service</name>
-                    </schema-service>
+                    </config-schema-service>
                 </module>
                 -->
 
index fd828dc..6c6760d 100644 (file)
@@ -69,6 +69,9 @@
     <!--sal-protocolbuffer-encoding is now part of sal-clutering-commons-->
     <module>sal-clustering-commons</module>
 
+    <!-- sal clustering configuration -->
+    <module>sal-clustering-config</module>
+
     <!-- sal-distributed-datastore -->
     <module>sal-distributed-datastore</module>
 
index 24bfa3d..b5b034a 100644 (file)
@@ -100,16 +100,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+        return getFrom(logEntryIndex, journal.size());
+    }
+
+    @Override
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
         List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
-            entries.addAll(journal.subList(adjustedIndex, size));
+            int maxIndex = adjustedIndex + max;
+            if(maxIndex > size){
+                maxIndex = size;
+            }
+            entries.addAll(journal.subList(adjustedIndex, maxIndex));
         }
         return entries;
     }
 
+
     @Override
     public long size() {
        return journal.size();
index c633337..6432fa4 100644 (file)
@@ -33,7 +33,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      * Since this is set to 100 milliseconds the Election timeout should be
      * at least 200 milliseconds
      */
-    protected static final FiniteDuration HEART_BEAT_INTERVAL =
+    public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 
@@ -51,7 +51,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
         // returns 2 times the heart beat interval
-        return HEART_BEAT_INTERVAL.$times(2);
+        return getHeartBeatInterval().$times(2);
     }
 
     @Override
index caa0e50..0a979d2 100644 (file)
@@ -148,6 +148,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
+            onStateChanged();
         }
     }
 
@@ -206,7 +207,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
+            RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = switchBehavior(state);
+            if(oldBehavior != currentBehavior){
+                onStateChanged();
+            }
         }
     }
 
@@ -271,9 +276,21 @@ public abstract class RaftActor extends UntypedPersistentActor {
         String peerAddress = context.getPeerAddress(leaderId);
         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
             + peerAddress);
+
+        if(peerAddress == null){
+            return null;
+        }
         return context.actorSelection(peerAddress);
     }
 
+    /**
+     *
+     * @return the current leader's id
+     */
+    protected String getLeaderId(){
+        return currentBehavior.getLeaderId();
+    }
+
     protected RaftState getRaftState() {
         return currentBehavior.state();
     }
@@ -375,7 +392,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             behavior = new Leader(context);
         }
 
-        onStateChanged();
+
 
         return behavior;
     }
index b7c8955..e6e160b 100644 (file)
@@ -84,6 +84,11 @@ public interface ReplicatedLog {
      */
     List<ReplicatedLogEntry> getFrom(long index);
 
+    /**
+     *
+     * @param index the index of the log entry
+     */
+    List<ReplicatedLogEntry> getFrom(long index, int max);
 
     /**
      *
index 2a44e8b..a506662 100644 (file)
@@ -310,7 +310,7 @@ public class Leader extends AbstractRaftActorBehavior {
                     // that has fallen too far behind with the log but yet is not
                     // eligible to receive a snapshot
                     entries =
-                        context.getReplicatedLog().getFrom(nextIndex);
+                        context.getReplicatedLog().getFrom(nextIndex, 1);
                 }
 
                 followerActor.tell(
index ae8e525..9136658 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +32,12 @@ public class AbstractReplicatedLogImplTest {
     @Before
     public void setUp() {
         replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+        // create a set of initial entries in the in-memory log
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
+
     }
 
     @After
@@ -43,11 +50,6 @@ public class AbstractReplicatedLogImplTest {
 
     @Test
     public void testIndexOperations() {
-        // create a set of initial entries in the in-memory log
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
 
         // check if the values returned are correct, with snapshotIndex = -1
         assertEquals("B", replicatedLogImpl.get(1).getData().toString());
@@ -112,6 +114,22 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testGetFromWithMax(){
+        List<ReplicatedLogEntry> from = replicatedLogImpl.getFrom(0, 1);
+        Assert.assertEquals(1, from.size());
+        Assert.assertEquals(1, from.get(0).getTerm());
+
+        from = replicatedLogImpl.getFrom(0, 20);
+        Assert.assertEquals(4, from.size());
+        Assert.assertEquals(2, from.get(3).getTerm());
+
+        from = replicatedLogImpl.getFrom(1, 2);
+        Assert.assertEquals(2, from.size());
+        Assert.assertEquals(1, from.get(1).getTerm());
+
+    }
+
     // create a snapshot for test
     public Map takeSnapshot(int numEntries) {
         Map map = new HashMap(numEntries);
index aa50fa7..70671a6 100644 (file)
@@ -248,6 +248,23 @@ public class MockRaftActorContext implements RaftActorContext {
             return entries;
         }
 
+        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
+            if(index >= log.size() || index < 0){
+                return Collections.EMPTY_LIST;
+            }
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            int maxIndex = (int) index + max;
+            if(maxIndex > log.size()){
+                maxIndex = log.size();
+            }
+
+            for(int i=(int) index ; i < maxIndex ; i++) {
+                entries.add(get(i));
+            }
+            return entries;
+
+        }
+
         @Override public long size() {
             return log.size();
         }
index c763683..d478b17 100644 (file)
@@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -80,12 +81,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     public void testThatAnElectionTimeoutIsTriggered(){
         new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
+            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
                 protected void run() {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
index c015d95..c5a81aa 100644 (file)
@@ -5,6 +5,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -41,12 +42,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testThatAnElectionTimeoutIsTriggered(){
         new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
+            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
                 protected void run() {
 
                     Follower follower = new Follower(createActorContext(getTestActor()));
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
index c924b74..237d967 100644 (file)
@@ -39,7 +39,7 @@ import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.Delegator;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
index 709b62f..783e5c0 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.sal.binding.api.rpc.RpcRoutingTable;
 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
index 1ea2eba..15314d3 100644 (file)
@@ -11,7 +11,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
index 05651bf..c390fe7 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProvide
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderInstance;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index 258ba51..58e46ce 100644 (file)
@@ -19,7 +19,7 @@ import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index f2e4670..13a9f1c 100644 (file)
@@ -33,7 +33,7 @@ import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
index 63a4ffb..356a4b8 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.sal.binding.test.util;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
diff --git a/opendaylight/md-sal/sal-clustering-config/pom.xml b/opendaylight/md-sal/sal-clustering-config/pom.xml
new file mode 100644 (file)
index 0000000..d726823
--- /dev/null
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>sal-clustering-config</artifactId>
+  <description>Configuration files for md-sal clustering</description>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+        <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/*.conf</file>
+                  <type>xml</type>
+                  <classifier>config</classifier>
+                </artifact>
+              </artifacts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
new file mode 100644 (file)
index 0000000..7891ee2
--- /dev/null
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+    <configuration>
+        <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+            <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-inmemory-data-broker</type>
+                    <name>inmemory-data-broker</name>
+
+                    <schema-service>
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
+                        <name>yang-schema-service</name>
+                    </schema-service>
+
+                    <config-data-store>
+                        <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
+                        <name>distributed-config-store-service</name>
+                    </config-data-store>
+
+                    <operational-data-store>
+                        <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
+                        <name>distributed-operational-store-service</name>
+                    </operational-data-store>
+                </module>
+
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
+                    <name>distributed-operational-store-module</name>
+                    <schema-service>
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
+                        <name>yang-schema-service</name>
+                    </schema-service>
+                </module>
+
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-config-datastore-provider</type>
+                    <name>distributed-config-store-module</name>
+                    <schema-service>
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
+                        <name>yang-schema-service</name>
+                    </schema-service>
+                </module>
+
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
+                    <name>remote-rpc-connector</name>
+                    <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                        <name>dom-broker</name>
+                    </dom-broker>
+                </module>
+
+            </modules>
+            <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+
+                <service>
+                    <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
+                    <instance>
+                        <name>distributed-config-store-service</name>
+                        <provider>/modules/module[type='distributed-config-datastore-provider'][name='distributed-config-store-module']</provider>
+                    </instance>
+                </service>
+                <service>
+                    <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
+                    <instance>
+                        <name>distributed-operational-store-service</name>
+                        <provider>/modules/module[type='distributed-operational-datastore-provider'][name='distributed-operational-store-module']</provider>
+                    </instance>
+                </service>
+
+            </services>
+        </data>
+    </configuration>
+    <required-capabilities>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&amp;revision=2013-10-28</capability>
+    </required-capabilities>
+</snapshot>
diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
new file mode 100644 (file)
index 0000000..9749ae2
--- /dev/null
@@ -0,0 +1,55 @@
+
+odl-cluster-data {
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "<CHANGE_ME>"
+        port = 2550
+           maximum-frame-size = 2097152
+           send-buffer-size = 52428800
+           receive-buffer-size = 52428800
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster-data@<CHANGE_ME>:2550"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
+
+odl-cluster-rpc {
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "<CHANGE_ME>"
+        port = 2551
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@<CHANGE_ME>:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/module-shards.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/module-shards.conf
new file mode 100644 (file)
index 0000000..8449abb
--- /dev/null
@@ -0,0 +1,70 @@
+# This file describes which shards live on which members
+# The format for a module-shards is as follows,
+# {
+#    name = "<friendly_name_of_the_module>"
+#    shards = [
+#        {
+#            name="<any_name_that_is_unique_for_the_module>"
+#            replicas = [
+#                "<name_of_member_on_which_to_run>"
+#            ]
+#     ]
+# }
+#
+# For Helium we support only one shard per module. Beyond Helium
+# we will support more than 1
+# The replicas section is a collection of member names. This information
+# will be used to decide on which members replicas of a particular shard will be
+# located. Once replication is integrated with the distributed data store then
+# this section can have multiple entries.
+#
+#
+
+
+module-shards = [
+    {
+        name = "default"
+        shards = [
+            {
+                name="default"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    },
+    {
+        name = "topology"
+        shards = [
+            {
+                name="topology"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    },
+    {
+        name = "inventory"
+        shards = [
+            {
+                name="inventory"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    },
+         {
+             name = "toaster"
+             shards = [
+                 {
+                     name="toaster"
+                     replicas = [
+                         "member-1"
+                     ]
+                 }
+             ]
+         }
+
+]
diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/modules.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/modules.conf
new file mode 100644 (file)
index 0000000..68347ee
--- /dev/null
@@ -0,0 +1,32 @@
+# This file should describe all the modules that need to be placed in a separate shard
+# The format of the configuration is as follows
+# {
+#    name = "<friendly_name_of_module>"
+#    namespace = "<the yang namespace of the module>"
+#    shard-strategy = "module"
+# }
+#
+# Note that at this time the only shard-strategy we support is module which basically
+# will put all the data of a single module in two shards (one for config and one for
+# operational data)
+
+modules = [
+    {
+        name = "inventory"
+        namespace = "urn:opendaylight:inventory"
+        shard-strategy = "module"
+    },
+
+    {
+        name = "topology"
+        namespace = "urn:TBD:params:xml:ns:yang:network-topology"
+        shard-strategy = "module"
+    },
+
+    {
+        name = "toaster"
+        namespace = "http://netconfcentral.org/ns/toaster"
+        shard-strategy = "module"
+    }
+
+]
index a732f2f..ca6e6e9 100644 (file)
@@ -36,7 +36,7 @@ import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Path;
 import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index baf04fe..15c0548 100644 (file)
@@ -20,8 +20,8 @@ public class ActorSystemFactory {
 
         @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
                 ActorSystem system =
-                    ActorSystem.create("opendaylight-cluster", ConfigFactory
-                        .load().getConfig("ODLCluster"));
+                    ActorSystem.create("opendaylight-cluster-data", ConfigFactory
+                        .load().getConfig("odl-cluster-data"));
                 system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
                 return system;
         }
index b435eda..cdf04dd 100644 (file)
@@ -33,7 +33,7 @@ public class DataChangeListener extends AbstractUntypedActor {
     }
 
     @Override public void handleReceive(Object message) throws Exception {
-        if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
+        if(message instanceof DataChanged){
             dataChanged(message);
         } else if(message instanceof EnableNotification){
             enableNotification((EnableNotification) message);
@@ -51,13 +51,13 @@ public class DataChangeListener extends AbstractUntypedActor {
             return;
         }
 
-        DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+        DataChanged reply = (DataChanged) message;
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
             change = reply.getChange();
         this.listener.onDataChanged(change);
 
         if(getSender() != null){
-            getSender().tell(new DataChangedReply().toSerializable(), getSelf());
+            getSender().tell(new DataChangedReply(), getSelf());
         }
     }
 
index cd9c330..a4ca456 100644 (file)
@@ -30,6 +30,6 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInst
 
     @Override public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext,change).toSerializable(), null);
+        dataChangeListenerActor.tell(new DataChanged(schemaContext,change), null);
     }
 }
index 780f28f..479af79 100644 (file)
@@ -8,11 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.Executors;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -33,8 +32,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
 
 /**
  *
@@ -88,13 +86,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         Object result = actorContext.executeLocalShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                scope).toSerializable(),
+                scope),
             ActorContext.ASK_DURATION
         );
 
         if (result != null) {
-            RegisterChangeListenerReply reply = RegisterChangeListenerReply
-                .fromSerializable(actorContext.getActorSystem(), result);
+            RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
             return new DataChangeListenerRegistrationProxy(actorContext
                 .actorSelection(reply.getListenerRegistrationPath()), listener,
                 dataChangeListenerActor);
index 9cda3f1..10dbbc8 100644 (file)
@@ -15,6 +15,7 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -33,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -42,6 +45,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -58,6 +63,8 @@ import java.util.concurrent.Executors;
  */
 public class Shard extends RaftActor {
 
+    private static final ConfigParams configParams = new ShardConfigParams();
+
     public static final String DEFAULT_NAME = "default";
 
     private final ListeningExecutorService storeExecutor =
@@ -84,7 +91,7 @@ public class Shard extends RaftActor {
     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
 
     private Shard(String name, Map<String, String> peerAddresses) {
-        super(name, peerAddresses);
+        super(name, peerAddresses, Optional.of(configParams));
 
         this.name = name;
 
@@ -121,8 +128,8 @@ public class Shard extends RaftActor {
             } else if(getLeader() != null){
                 getLeader().forward(message, getContext());
             }
-        } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
-            registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
+        } else if (message instanceof RegisterChangeListener) {
+            registerChangeListener((RegisterChangeListener) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext((UpdateSchemaContext) message);
         } else if (message instanceof ForwardedCommitTransaction) {
@@ -271,7 +278,7 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
 
         getSender()
-            .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
+            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                 getSelf());
     }
 
@@ -318,9 +325,25 @@ public class Shard extends RaftActor {
         for(ActorSelection dataChangeListener : dataChangeListeners){
             dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
         }
+
+        if(getLeaderId() != null){
+            shardMBean.setLeader(getLeaderId());
+        }
+
+        shardMBean.setRaftState(getRaftState().name());
     }
 
     @Override public String persistenceId() {
         return this.name;
     }
+
+
+    private static class ShardConfigParams extends DefaultConfigParamsImpl {
+        public static final FiniteDuration HEART_BEAT_INTERVAL =
+            new FiniteDuration(500, TimeUnit.MILLISECONDS);
+
+        @Override public FiniteDuration getHeartBeatInterval() {
+            return HEART_BEAT_INTERVAL;
+        }
+    }
 }
index 5622065..915b13d 100644 (file)
@@ -76,7 +76,6 @@ public class ThreePhaseCommitCohortProxy implements
                             CanCommitTransactionReply reply =
                                     CanCommitTransactionReply.fromSerializable(response);
                             if (!reply.getCanCommit()) {
-                                System.out.println("**TOM - failed: false");
                                 return false;
                             }
                         }
index 2da6aae..4eb6a8c 100644 (file)
@@ -9,6 +9,8 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
   private  Long committedTransactionsCount;
   private Long journalMessagesCount;
   final private String shardName;
+  private String leader;
+  private String raftState;
 
   ShardStats(String shardName){
     this.shardName = shardName;
@@ -33,6 +35,13 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
     return journalMessagesCount;
   }
 
+  @Override public String getLeader() {
+    return leader;
+  }
+
+  @Override public String getRaftState() {
+    return raftState;
+  }
 
   public Long incrementCommittedTransactionCount() {
     return committedTransactionsCount++;
@@ -49,6 +58,13 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
 
   }
 
+  public void setLeader(String leader){
+    this.leader = leader;
+  }
+
+  public void setRaftState(String raftState){
+    this.raftState = raftState;
+  }
 
 
   @Override
index 6f4b65a..fc7ebd9 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.datastore.shardstrategy;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
+import java.util.List;
+
 public class ModuleShardStrategy implements ShardStrategy {
 
     public static final String NAME = "module";
@@ -25,6 +27,11 @@ public class ModuleShardStrategy implements ShardStrategy {
     }
 
     @Override public String findShard(YangInstanceIdentifier path) {
-        return configuration.getShardNamesFromModuleName(moduleName).get(0);
+        List<String> shardNames =
+            configuration.getShardNamesFromModuleName(moduleName);
+        if(shardNames.size() == 0){
+            return DefaultShardStrategy.DEFAULT_SHARD;
+        }
+        return shardNames.get(0);
     }
 }
index 2df945e..9a05c38 100644 (file)
@@ -16,6 +16,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public interface ShardStrategy {
   /**
    * Find the name of the shard in which the data pointed to by the specified path belongs in
+   * <p>
+   * Should return the name of the default shard DefaultShardStrategy.DEFAULT_SHARD
+   * if no matching shard was found
    *
    * @param path The location of the data in the logical tree
    * @return
index 76914c2..daac89c 100644 (file)
@@ -1,15 +1,60 @@
-ODLCluster{
 
-actor {
-        serializers {
-          java = "akka.serialization.JavaSerializer"
-          proto = "akka.remote.serialization.ProtobufSerializer"
-        }
+odl-cluster-data {
+  akka {
+    cluster {
+        roles = [
+          "member-1"
+        ]
+    }
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      serializers {
+                java = "akka.serialization.JavaSerializer"
+                proto = "akka.remote.serialization.ProtobufSerializer"
+              }
+
+              serialization-bindings {
+                  "com.google.protobuf.Message" = proto
+
+              }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2550
+           maximum-frame-size = 2097152
+           send-buffer-size = 52428800
+           receive-buffer-size = 52428800
+      }
+    }
 
-        serialization-bindings {
-            "com.google.protobuf.Message" = proto
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
-        }
+      auto-down-unreachable-after = 10s
     }
+  }
+}
+
+odl-cluster-rpc {
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
 
-}
\ No newline at end of file
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2551
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
index 05ef33f..e820703 100644 (file)
@@ -1,7 +1,7 @@
 modules = [
     {
         name = "inventory"
-        namespace = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people"
+        namespace = "urn:opendaylight:inventory"
         shard-strategy = "module"
     }
 ]
index 5d37589..6f355cb 100644 (file)
@@ -40,7 +40,7 @@ module distributed-datastore-provider {
     augment "/config:modules/config:module/config:configuration" {
         case distributed-config-datastore-provider {
             when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
-            container schema-service {
+            container config-schema-service {
                           uses config:service-ref {
                                refine type {
                                       mandatory false;
@@ -55,7 +55,7 @@ module distributed-datastore-provider {
         augment "/config:modules/config:module/config:configuration" {
             case distributed-operational-datastore-provider {
                 when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'";
-                container schema-service {
+                container operational-schema-service {
                               uses config:service-ref {
                                    refine type {
                                           mandatory false;
index df3c78e..6599bd8 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -35,6 +36,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Collections;
 
+import static junit.framework.Assert.assertEquals;
+
 public class BasicIntegrationTest extends AbstractActorTest {
 
     @Test
@@ -61,17 +64,24 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         getRef());
 
 
-                    // Wait for Shard to become a Leader
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+                    // Wait for a specific log message to show up
+                    final boolean result =
+                        new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                        ) {
+                            protected Boolean run() {
+                                return true;
+                            }
+                        }.from(shard.path().toString())
+                            .message("Switching from state Candidate to Leader")
+                            .occurrences(1).exec();
+
+                    assertEquals(true, result);
+
                     // 1. Create a TransactionChain
                     shard.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>("CreateTransactionChainReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionChainReply") {
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
                                     ActorPath transactionChainPath =
@@ -93,7 +103,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
 
                     final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>("CreateTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
                                     CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
@@ -115,7 +125,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                         getRef());
 
-                    Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
+                    Boolean writeDone = new ExpectMsg<Boolean>(duration("1 seconds"), "WriteDataReply") {
                         protected Boolean match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return true;
@@ -134,7 +144,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>("ReadyTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "ReadyTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                     ActorPath cohortPath =
@@ -157,7 +167,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
 
                     Boolean preCommitDone =
-                        new ExpectMsg<Boolean>("PreCommitTransactionReply") {
+                        new ExpectMsg<Boolean>(duration("1 seconds"), "PreCommitTransactionReply") {
                             protected Boolean match(Object in) {
                                 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
                                     return true;
index 8c1cbbb..b2ee4a4 100644 (file)
@@ -94,7 +94,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
 
         Assert.assertEquals(1, listMessages.size());
 
-        Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.SERIALIZABLE_CLASS));
+        Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.class));
 
     }
 }
index 8413bac..9202485 100644 (file)
@@ -41,7 +41,7 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
 
           subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
index c4ec8b4..26ec583 100644 (file)
@@ -102,13 +102,13 @@ public class DataChangeListenerTest extends AbstractActorTest {
                     subject.tell(new EnableNotification(true), getRef());
 
                     subject.tell(
-                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
                         getRef());
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
-                            if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+                            if (in != null && in.getClass().equals(DataChangedReply.class)) {
 
                                 return true;
                             } else {
@@ -141,7 +141,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(
-                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
                         getRef());
 
                     expectNoMsg();
index 0a0c04b..fc527b6 100644 (file)
@@ -1,11 +1,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -20,19 +21,29 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
 
-public class DistributedDataStoreIntegrationTest{
+public class DistributedDataStoreIntegrationTest {
 
     private static ActorSystem system;
 
     @Before
-    public void setUp() {
+    public void setUp() throws IOException {
+        File journal = new File("journal");
+
+        if(journal.exists()) {
+            FileUtils.deleteDirectory(journal);
+        }
+
+
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
     }
@@ -49,82 +60,153 @@ public class DistributedDataStoreIntegrationTest{
 
     @Test
     public void integrationTest() throws Exception {
-        Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
         ShardStrategyFactory.setConfiguration(configuration);
-        DistributedDataStore distributedDataStore =
-            new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
 
-        distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
-        Thread.sleep(1500);
 
-        DOMStoreReadWriteTransaction transaction =
-            distributedDataStore.newReadWriteTransaction();
+        new JavaTestKit(getSystem()) {
+            {
+
+                new Within(duration("10 seconds")) {
+                    protected void run() {
+                        try {
+                            final DistributedDataStore distributedDataStore =
+                                new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+
+                            distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+                            // Wait for a specific log message to show up
+                            final boolean result =
+                                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                                    ) {
+                                    protected Boolean run() {
+                                        return true;
+                                    }
+                                }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+                                    .message("Switching from state Candidate to Leader")
+                                    .occurrences(1).exec();
+
+                            assertEquals(true, result);
+
+                            DOMStoreReadWriteTransaction transaction =
+                                distributedDataStore.newReadWriteTransaction();
 
-        transaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                            transaction
+                                .write(TestModel.TEST_PATH, ImmutableNodes
+                                    .containerNode(TestModel.TEST_QNAME));
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> future =
-            transaction.read(TestModel.TEST_PATH);
+                            ListenableFuture<Optional<NormalizedNode<?, ?>>>
+                                future =
+                                transaction.read(TestModel.TEST_PATH);
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
+                            Optional<NormalizedNode<?, ?>> optional =
+                                future.get();
 
-        Assert.assertTrue(optional.isPresent());
+                            Assert.assertTrue("Node not found", optional.isPresent());
 
-        NormalizedNode<?, ?> normalizedNode = optional.get();
+                            NormalizedNode<?, ?> normalizedNode =
+                                optional.get();
 
-        assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType());
+                            assertEquals(TestModel.TEST_QNAME,
+                                normalizedNode.getNodeType());
 
-        DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+                            DOMStoreThreePhaseCommitCohort ready =
+                                transaction.ready();
 
-        ListenableFuture<Boolean> canCommit = ready.canCommit();
+                            ListenableFuture<Boolean> canCommit =
+                                ready.canCommit();
 
-        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
-        ListenableFuture<Void> preCommit = ready.preCommit();
+                            ListenableFuture<Void> preCommit =
+                                ready.preCommit();
 
-        preCommit.get(5, TimeUnit.SECONDS);
+                            preCommit.get(5, TimeUnit.SECONDS);
 
-        ListenableFuture<Void> commit = ready.commit();
+                            ListenableFuture<Void> commit = ready.commit();
+
+                            commit.get(5, TimeUnit.SECONDS);
+                        } catch (ExecutionException | TimeoutException | InterruptedException e){
+                            fail(e.getMessage());
+                        }
+                    }
+                };
+            }
+        };
 
-        commit.get(5, TimeUnit.SECONDS);
     }
 
 
-    @Test
+    //FIXME : Disabling test because it's flaky
+    //@Test
     public void integrationTestWithMultiShardConfiguration()
         throws ExecutionException, InterruptedException, TimeoutException {
-        Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
 
         ShardStrategyFactory.setConfiguration(configuration);
-        DistributedDataStore distributedDataStore =
-            new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
 
+        new JavaTestKit(getSystem()) {
+            {
+
+                new Within(duration("10 seconds")) {
+                    protected void run() {
+                        try {
+                            final DistributedDataStore distributedDataStore =
+                                new DistributedDataStore(getSystem(), "config",
+                                    new MockClusterWrapper(), configuration);
+
+                            distributedDataStore.onGlobalContextUpdated(
+                                SchemaContextHelper.full());
+
+                            // Wait for a specific log message to show up
+                            final boolean result =
+                                new JavaTestKit.EventFilter<Boolean>(
+                                    Logging.Info.class
+                                ) {
+                                    protected Boolean run() {
+                                        return true;
+                                    }
+                                }.from(
+                                    "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
+                                    .message(
+                                        "Switching from state Candidate to Leader")
+                                    .occurrences(1)
+                                    .exec();
+
+                            Thread.sleep(1000);
+
+
+                            DOMStoreReadWriteTransaction transaction =
+                                distributedDataStore.newReadWriteTransaction();
 
-        distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
+                            transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+                            transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-        // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
-        // the SchemaContext in time. Is there any way we can make this deterministic?
-        Thread.sleep(2000);
+                            DOMStoreThreePhaseCommitCohort ready = transaction.ready();
 
-        DOMStoreReadWriteTransaction transaction =
-            distributedDataStore.newReadWriteTransaction();
+                            ListenableFuture<Boolean> canCommit = ready.canCommit();
 
-        transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
-        DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+                            ListenableFuture<Void> preCommit = ready.preCommit();
 
-        ListenableFuture<Boolean> canCommit = ready.canCommit();
+                            preCommit.get(5, TimeUnit.SECONDS);
 
-        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+                            ListenableFuture<Void> commit = ready.commit();
 
-        ListenableFuture<Void> preCommit = ready.preCommit();
+                            commit.get(5, TimeUnit.SECONDS);
 
-        preCommit.get(5, TimeUnit.SECONDS);
+                            assertEquals(true, result);
+                        } catch(ExecutionException | TimeoutException | InterruptedException e){
+                            fail(e.getMessage());
+                        }
+                    }
+                };
+            }
+        };
 
-        ListenableFuture<Void> commit = ready.commit();
 
-        commit.get(5, TimeUnit.SECONDS);
     }
 
 }
index 03191f7..d1beab9 100644 (file)
@@ -73,7 +73,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     @org.junit.Test
     public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
 
-        mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+        mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
 
         ListenerRegistration registration =
             distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
index 268ed3c..e9ad450 100644 (file)
@@ -75,7 +75,7 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
                     expectNoMsg();
                 }
@@ -170,7 +170,7 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("primary found") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
@@ -208,13 +208,13 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
                     MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryNotFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
 
                     expectNoMsg();
                 }
index ee112a4..431a266 100644 (file)
@@ -2,7 +2,9 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
@@ -38,19 +40,25 @@ public class ShardTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
 
-            // Wait for Shard to become a Leader
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                ) {
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message("Switching from state Candidate to Leader")
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
@@ -91,7 +99,7 @@ public class ShardTest extends AbstractActorTest {
                         getRef());
 
                     subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                        getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
+                        getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
                         getRef());
 
                     final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
@@ -107,12 +115,12 @@ public class ShardTest extends AbstractActorTest {
 
                     assertFalse(notificationEnabled);
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in.getClass().equals(RegisterChangeListenerReply.SERIALIZABLE_CLASS)) {
+                            if (in.getClass().equals(RegisterChangeListenerReply.class)) {
                                 RegisterChangeListenerReply reply =
-                                    RegisterChangeListenerReply.fromSerializable(getSystem(),in);
+                                    (RegisterChangeListenerReply) in;
                                 return reply.getListenerRegistrationPath()
                                     .toString();
                             } else {
@@ -138,13 +146,18 @@ public class ShardTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testCreateTransaction");
 
 
-            // Wait for Shard to become a Leader
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                ) {
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message("Switching from state Candidate to Leader")
+                    .occurrences(1).exec();
 
+            Assert.assertEquals(true, result);
 
             new Within(duration("1 seconds")) {
                 protected void run() {
@@ -156,7 +169,7 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in instanceof CreateTransactionReply) {
index 57d0bd6..b35880a 100644 (file)
@@ -35,7 +35,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
           subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -70,7 +70,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
           subject.tell(new CloseTransactionChain().toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
index f15e3bf..632ecc2 100644 (file)
@@ -65,7 +65,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
@@ -105,7 +105,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         new ReadData(TestModel.TEST_PATH).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
@@ -141,7 +141,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                             getRef());
 
                     final CompositeModification compositeModification =
-                        new ExpectMsg<CompositeModification>("match hint") {
+                        new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
                             protected CompositeModification match(Object in) {
                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
@@ -180,7 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
@@ -255,7 +255,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
@@ -292,7 +292,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
@@ -330,7 +330,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new CloseTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
@@ -343,7 +343,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     assertEquals("match", out);
 
-                    final String termination = new ExpectMsg<String>("match hint") {
+                    final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in instanceof Terminated) {
index 88753e4..3394cdc 100644 (file)
@@ -1,6 +1,5 @@
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
-import junit.framework.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -8,6 +7,10 @@ import org.junit.rules.ExpectedException;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.ConfigurationImpl;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import static junit.framework.Assert.assertEquals;
 
 public class ModuleShardStrategyTest {
     @Rule
@@ -28,6 +31,23 @@ public class ModuleShardStrategyTest {
 
         String shard = moduleShardStrategy.findShard(CarsModel.BASE_PATH);
 
-        Assert.assertEquals("cars-1", shard);
+        assertEquals("cars-1", shard);
+    }
+
+    @Test
+    public void testFindShardWhenModuleConfigurationPresentInModulesButMissingInModuleShards() {
+
+        final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:missing", "2014-03-13",
+            "missing");
+
+        final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
+
+        ModuleShardStrategy moduleShardStrategy =
+            new ModuleShardStrategy("missing", configuration);
+
+        String shard = moduleShardStrategy.findShard(BASE_PATH);
+
+        assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
+
     }
 }
index aebff27..eda1c30 100644 (file)
@@ -1,4 +1,5 @@
 akka {
+    loggers = [akka.testkit.TestEventListener]
     actor {
          serializers {
                   java = "akka.serialization.JavaSerializer"
index 22854cb..f4919e7 100644 (file)
@@ -15,4 +15,10 @@ modules = [
      shard-strategy = "module"
     }
 
+    {
+     name = "missing"
+     namespace = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:missing"
+     shard-strategy = "module"
+    }
+
 ]
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementationUnavailableException.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementationUnavailableException.java
new file mode 100644 (file)
index 0000000..3710822
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.api;
+
+/**
+ * Exception reported when no RPC implementation is found in the system.
+ */
+public class RpcImplementationUnavailableException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public RpcImplementationUnavailableException(final String message) {
+        super(message);
+    }
+
+    public RpcImplementationUnavailableException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
index 4165066..8944e19 100644 (file)
@@ -21,7 +21,7 @@ import org.opendaylight.controller.md.sal.dom.broker.spi.mount.SimpleDOMMountPoi
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
index b01db3d..fef2a80 100644 (file)
@@ -18,7 +18,7 @@ import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionListener;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class BackwardsCompatibleMountPointManager implements MountProvisionService, MountProvisionListener {
index f0dd5b9..df4549f 100644 (file)
@@ -11,7 +11,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
index d84f1dc..434cf85 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionListener;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 @Deprecated
index 19ff03b..c8e3c0b 100644 (file)
@@ -8,7 +8,9 @@
 package org.opendaylight.controller.sal.dom.broker.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -22,11 +24,8 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
 
@@ -81,9 +80,9 @@ class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiabl
         }
         if (potential == null) {
             return router.invokeRpc(rpc, (YangInstanceIdentifier) route, input);
+        } else {
+            return potential.invokeRpc(rpc, input);
         }
-        checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
-        return potential.invokeRpc(rpc, input);
     }
 
     public void addPath(final QName context, final YangInstanceIdentifier path, final RoutedRpcRegImpl routedRpcRegImpl) {
index 44e7abc..b4d7d2d 100644 (file)
@@ -10,6 +10,13 @@ package org.opendaylight.controller.sal.dom.broker.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -22,12 +29,13 @@ import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementationUnavailableException;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
@@ -38,12 +46,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
-
+/**
+ * RPC broker responsible for routing requests to remote systems.
+ */
 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
 
     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
@@ -217,8 +222,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
 
     @Override
     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
-      checkState(defaultDelegate != null, "No implementation is available for rpc:%s path:%s", rpc, route);
-      return defaultDelegate.invokeRpc(rpc, route, input);
+        if (defaultDelegate == null) {
+            return Futures.immediateFailedCheckedFuture(new RpcImplementationUnavailableException("No RPC implementation found"));
+        }
+
+        LOG.debug("Forwarding RPC {} path {} to delegate {}", rpc, route);
+        return defaultDelegate.invokeRpc(rpc, route, input);
     }
 
     void remove(final GlobalRpcRegistration registration) {
index ff64cd6..3ddf0b6 100644 (file)
@@ -9,9 +9,15 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.builder;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -37,13 +43,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-
 /**
  * Resolve Data Change Events based on modifications and listeners
  *
@@ -278,6 +277,11 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
             final Collection<Node> listeners, final NormalizedNode<?, ?> beforeData,
             final NormalizedNode<?, ?> afterData) {
 
+        // FIXME: BUG-1493: check the listeners to prune unneeded changes:
+        //                  for subtrees, we have to do all
+        //                  for one, we need to expand children
+        //                  for base, we just report replacement
+
         if (beforeData instanceof NormalizedNodeContainer<?, ?, ?>) {
             // Node is container (contains child) and we have interested
             // listeners registered for it, that means we need to do
@@ -306,14 +310,12 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
             final Collection<Node> listeners,
             final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont,
                     final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont) {
-        final Set<PathArgument> alreadyProcessed = new HashSet<>();
         final List<DOMImmutableDataChangeEvent> childChanges = new LinkedList<>();
 
-        DataChangeScope potentialScope = DataChangeScope.BASE;
         // We look at all children from before and compare it with after state.
         for (NormalizedNode<PathArgument, ?> beforeChild : beforeCont.getValue()) {
-            PathArgument childId = beforeChild.getIdentifier();
-            alreadyProcessed.add(childId);
+            final PathArgument childId = beforeChild.getIdentifier();
+
             YangInstanceIdentifier childPath = path.node(childId);
             Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
             Optional<NormalizedNode<PathArgument, ?>> afterChild = afterCont.getChild(childId);
@@ -323,15 +325,17 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
             if (childChange != NO_CHANGE) {
                 childChanges.add(childChange);
             }
-
         }
 
         for (NormalizedNode<PathArgument, ?> afterChild : afterCont.getValue()) {
-            PathArgument childId = afterChild.getIdentifier();
-            if (!alreadyProcessed.contains(childId)) {
-                // We did not processed that child already
-                // and it was not present in previous loop, that means it is
-                // created.
+            final PathArgument childId = afterChild.getIdentifier();
+
+            /*
+             * We have already iterated of the before-children, so have already
+             * emitted modify/delete events. This means the child has been
+             * created.
+             */
+            if (!beforeCont.getChild(childId).isPresent()) {
                 Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
                 YangInstanceIdentifier childPath = path.node(childId);
                 childChanges.add(resolveSameEventRecursivelly(childPath , childListeners, afterChild,
@@ -342,7 +346,7 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
             return NO_CHANGE;
         }
 
-        Builder eventBuilder = builder(potentialScope) //
+        Builder eventBuilder = builder(DataChangeScope.BASE) //
                 .setBefore(beforeCont) //
                 .setAfter(afterCont)
                 .addUpdated(path, beforeCont, afterCont);
index 07d3c08..350132c 100644 (file)
@@ -7,17 +7,11 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
@@ -40,6 +34,14 @@ import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
@@ -54,11 +56,20 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final SchemaContextProviderFactory schemaContextProviderFactory;
     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+    private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
     private final NotificationHandler notificationHandler;
 
     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
             final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+        return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+    }
+
+    @VisibleForTesting
+    protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+            final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+            final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+            final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
 
         return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
                 new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
@@ -67,18 +78,20 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                         return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
                                 deviceRpc));
                     }
-                });
+                }, stateSchemasResolver);
     }
 
     @VisibleForTesting
     protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
-            final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
-            final SchemaContextProviderFactory schemaContextProviderFactory,
-            final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
+                            final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
+                            final SchemaContextProviderFactory schemaContextProviderFactory,
+                            final SchemaSourceProviderFactory<InputStream> sourceProviderFactory,
+                            final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
         this.id = id;
         this.messageTransformer = messageTransformer;
         this.salFacade = salFacade;
         this.sourceProviderFactory = sourceProviderFactory;
+        this.stateSchemasResolver = stateSchemasResolver;
         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
         this.schemaContextProviderFactory = schemaContextProviderFactory;
         this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
@@ -98,6 +111,11 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
             @Override
             public void run() {
                 final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
+
+                final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+                logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
+                // TODO use this for shared schema context
+
                 final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
                 updateMessageTransformer(schemaContextProvider);
@@ -204,6 +222,6 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
             Preconditions.checkNotNull(parsedNotification);
             salFacade.onNotification(parsedNotification);
         }
-
     }
+
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java
new file mode 100644 (file)
index 0000000..b540034
--- /dev/null
@@ -0,0 +1,213 @@
+package org.opendaylight.controller.sal.connect.netconf;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.NodeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds QNames for all yang modules reported by ietf-netconf-monitoring/state/schemas
+ */
+public final class NetconfStateSchemas {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfStateSchemas.class);
+
+    /**
+     * Factory for NetconfStateSchemas
+     */
+    public interface NetconfStateSchemasResolver {
+        NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id);
+    }
+
+    /**
+     * Default implementation resolving schemas QNames from netconf-state
+     */
+    public static final class NetconfStateSchemasResolverImpl implements NetconfStateSchemasResolver {
+
+        @Override
+        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+            return NetconfStateSchemas.create(deviceRpc, remoteSessionCapabilities, id);
+        }
+    }
+
+    public static final NetconfStateSchemas EMPTY = new NetconfStateSchemas(Collections.<RemoteYangSchema>emptySet());
+
+    private static final YangInstanceIdentifier STATE_SCHEMAS_IDENTIFIER =
+            YangInstanceIdentifier.builder().node(NetconfState.QNAME).node(Schemas.QNAME).build();
+    private static final YangInstanceIdentifier DATA_STATE_SCHEMAS_IDENTIFIER =
+            YangInstanceIdentifier.builder().node(NetconfMessageTransformUtil.NETCONF_DATA_QNAME)
+                    .node(NetconfState.QNAME).node(Schemas.QNAME).build();
+
+    private static final CompositeNode GET_SCHEMAS_RPC;
+    static {
+        final Node<?> filter = NetconfMessageTransformUtil.toFilterStructure(STATE_SCHEMAS_IDENTIFIER);
+        GET_SCHEMAS_RPC
+                = NodeFactory.createImmutableCompositeNode(NetconfMessageTransformUtil.NETCONF_GET_QNAME, null, Lists.<Node<?>>newArrayList(filter));
+    }
+
+    private final Set<RemoteYangSchema> availableYangSchemas;
+
+    public NetconfStateSchemas(final Set<RemoteYangSchema> availableYangSchemas) {
+        this.availableYangSchemas = availableYangSchemas;
+    }
+
+    public Set<RemoteYangSchema> getAvailableYangSchemas() {
+        return availableYangSchemas;
+    }
+
+    public Set<QName> getAvailableYangSchemasQNames() {
+        return Sets.newHashSet(Collections2.transform(getAvailableYangSchemas(), new Function<RemoteYangSchema, QName>() {
+            @Override
+            public QName apply(final RemoteYangSchema input) {
+                return input.getQName();
+            }
+        }));
+    }
+
+    /**
+     * Issue get request to remote device and parse response to find all schemas under netconf-state/schemas
+     */
+    private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+        if(remoteSessionCapabilities.isMonitoringSupported() == false) {
+            logger.warn("{}: Netconf monitoring not supported on device, cannot detect available schemas");
+            return EMPTY;
+        }
+
+        final RpcResult<CompositeNode> schemasNodeResult;
+        try {
+            schemasNodeResult = deviceRpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_GET_QNAME, GET_SCHEMAS_RPC).get();
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(id + ": Interrupted while waiting for response to " + STATE_SCHEMAS_IDENTIFIER, e);
+        } catch (final ExecutionException e) {
+            logger.warn("{}: Unable to detect available schemas, get to {} failed", id, STATE_SCHEMAS_IDENTIFIER, e);
+            return EMPTY;
+        }
+
+        if(schemasNodeResult.isSuccessful() == false) {
+            logger.warn("{}: Unable to detect available schemas, get to {} failed, {}", id, STATE_SCHEMAS_IDENTIFIER, schemasNodeResult.getErrors());
+            return EMPTY;
+        }
+
+        final CompositeNode schemasNode =
+                (CompositeNode) NetconfMessageTransformUtil.findNode(schemasNodeResult.getResult(), DATA_STATE_SCHEMAS_IDENTIFIER);
+        return create(schemasNode);
+    }
+
+    /**
+     * Parse response of get(netconf-state/schemas) to find all schemas under netconf-state/schemas
+     */
+    @VisibleForTesting
+    protected static NetconfStateSchemas create(final CompositeNode schemasNode) {
+        final Set<RemoteYangSchema> availableYangSchemas = Sets.newHashSet();
+
+        for (final CompositeNode schemaNode : schemasNode.getCompositesByName(Schema.QNAME.withoutRevision())) {
+            availableYangSchemas.add(RemoteYangSchema.createFromCompositeNode(schemaNode));
+        }
+
+        return new NetconfStateSchemas(availableYangSchemas);
+    }
+
+    public final static class RemoteYangSchema {
+        private final QName qname;
+
+        private RemoteYangSchema(final QName qname) {
+            this.qname = qname;
+        }
+
+        public QName getQName() {
+            return qname;
+        }
+
+        static RemoteYangSchema createFromCompositeNode(final CompositeNode schemaNode) {
+            Preconditions.checkArgument(schemaNode.getKey().equals(Schema.QNAME.withoutRevision()), "Wrong QName %s", schemaNode.getKey());
+
+            QName childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_FORMAT.withoutRevision();
+
+            final String formatAsString = getSingleChildNodeValue(schemaNode, childNode).get();
+            Preconditions.checkArgument(formatAsString.equals(Yang.QNAME.getLocalName()),
+                    "Expecting format to be only %s, not %s", Yang.QNAME.getLocalName(), formatAsString);
+
+            childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_LOCATION.withoutRevision();
+            final Set<String> locationsAsString = getAllChildNodeValues(schemaNode, childNode);
+            Preconditions.checkArgument(locationsAsString.contains(Schema.Location.Enumeration.NETCONF.toString()),
+                    "Expecting location to be %s, not %s", Schema.Location.Enumeration.NETCONF.toString(), locationsAsString);
+
+            childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_NAMESPACE.withoutRevision();
+            final String namespaceAsString = getSingleChildNodeValue(schemaNode, childNode).get();
+
+            childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_VERSION.withoutRevision();
+            // Revision does not have to be filled
+            final Optional<String> revisionAsString = getSingleChildNodeValue(schemaNode, childNode);
+
+            childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_IDENTIFIER.withoutRevision();
+            final String moduleNameAsString = getSingleChildNodeValue(schemaNode, childNode).get();
+
+            final QName moduleQName = revisionAsString.isPresent()
+                    ? QName.create(namespaceAsString, revisionAsString.get(), moduleNameAsString)
+                    : QName.create(URI.create(namespaceAsString), null, moduleNameAsString).withoutRevision();
+
+            return new RemoteYangSchema(moduleQName);
+        }
+
+        private static Set<String> getAllChildNodeValues(final CompositeNode schemaNode, final QName childNodeQName) {
+            final Set<String> extractedValues = Sets.newHashSet();
+            for (final SimpleNode<?> childNode : schemaNode.getSimpleNodesByName(childNodeQName)) {
+                extractedValues.add(getValueOfSimpleNode(childNodeQName, childNode).get());
+            }
+            return extractedValues;
+        }
+
+        private static Optional<String> getSingleChildNodeValue(final CompositeNode schemaNode, final QName childNode) {
+            final SimpleNode<?> node = schemaNode.getFirstSimpleByName(childNode);
+            return getValueOfSimpleNode(childNode, node);
+        }
+
+        private static Optional<String> getValueOfSimpleNode(final QName childNode, final SimpleNode<?> node) {
+            Preconditions.checkNotNull(node, "Child node %s not present", childNode);
+            final Object value = node.getValue();
+            return value == null ? Optional.<String>absent() : Optional.of(value.toString().trim());
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            final RemoteYangSchema that = (RemoteYangSchema) o;
+
+            if (!qname.equals(that.qname)) return false;
+
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return qname.hashCode();
+        }
+    }
+}
index 3871cdf..2f24adc 100644 (file)
@@ -229,7 +229,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
             try {
                 NetconfMessageTransformUtil.checkSuccessReply(message);
             }
-            catch( NetconfDocumentedException e ) {
+            catch(final NetconfDocumentedException e) {
                 logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id,
                              msgToS( request.request ), msgToS( message ), e );
 
index 533df9c..04a9951 100644 (file)
@@ -30,8 +30,6 @@ import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -63,7 +61,7 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
                 checkReadSuccess(result, path);
 
                 final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-                final CompositeNode node = (CompositeNode) findNode(data, path);
+                final CompositeNode node = (CompositeNode) NetconfMessageTransformUtil.findNode(data, path);
 
                 return data == null ?
                         Optional.<NormalizedNode<?, ?>>absent() :
@@ -105,7 +103,7 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
                 checkReadSuccess(result, path);
 
                 final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-                final CompositeNode node = (CompositeNode) findNode(data, path);
+                final CompositeNode node = (CompositeNode) NetconfMessageTransformUtil.findNode(data, path);
 
                 return data == null ?
                         Optional.<NormalizedNode<?, ?>>absent() :
@@ -116,33 +114,6 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
         return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
     }
 
-    private static Node<?> findNode(final CompositeNode node, final YangInstanceIdentifier identifier) {
-
-        Node<?> current = node;
-        for (final YangInstanceIdentifier.PathArgument arg : identifier.getPathArguments()) {
-            if (current instanceof SimpleNode<?>) {
-                return null;
-            } else if (current instanceof CompositeNode) {
-                final CompositeNode currentComposite = (CompositeNode) current;
-
-                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
-                if (current == null) {
-                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    return null;
-                }
-            }
-        }
-        return current;
-    }
-
     @Override
     public void close() {
         // NOOP
index 47ef903..5e61dfb 100644 (file)
@@ -96,7 +96,7 @@ public class NetconfMessageTransformer implements MessageTransformer<NetconfMess
             return toRpcResult(message, rpc, schemaContext.get());
         } else {
             final CompositeNode node = (CompositeNode) XmlDocumentUtils.toDomNode(message.getDocument());
-            return RpcResultBuilder.success( node ).build();
+            return RpcResultBuilder.success(node).build();
         }
     }
 
index 4f792a0..1e3cf4b 100644 (file)
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
@@ -34,6 +35,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.NodeFactory;
@@ -49,7 +51,13 @@ public class NetconfMessageTransformUtil {
 
     private NetconfMessageTransformUtil() {}
 
-    public static final QName IETF_NETCONF_MONITORING = QName.create("urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring", "2010-10-04", "ietf-netconf-monitoring");
+    public static final QName IETF_NETCONF_MONITORING = QName.create(NetconfState.QNAME, "ietf-netconf-monitoring");
+    public static final QName IETF_NETCONF_MONITORING_SCHEMA_FORMAT = QName.create(IETF_NETCONF_MONITORING, "format");
+    public static final QName IETF_NETCONF_MONITORING_SCHEMA_LOCATION = QName.create(IETF_NETCONF_MONITORING, "location");
+    public static final QName IETF_NETCONF_MONITORING_SCHEMA_IDENTIFIER = QName.create(IETF_NETCONF_MONITORING, "identifier");
+    public static final QName IETF_NETCONF_MONITORING_SCHEMA_VERSION = QName.create(IETF_NETCONF_MONITORING, "version");
+    public static final QName IETF_NETCONF_MONITORING_SCHEMA_NAMESPACE = QName.create(IETF_NETCONF_MONITORING, "namespace");
+
     public static URI NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0");
     public static QName NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
     public static QName NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
@@ -365,4 +373,31 @@ public class NetconfMessageTransformUtil {
             return it.toInstance();
         }
     }
+
+    public static Node<?> findNode(final CompositeNode node, final YangInstanceIdentifier identifier) {
+
+        Node<?> current = node;
+        for (final YangInstanceIdentifier.PathArgument arg : identifier.getPathArguments()) {
+            if (current instanceof SimpleNode<?>) {
+                return null;
+            } else if (current instanceof CompositeNode) {
+                final CompositeNode currentComposite = (CompositeNode) current;
+
+                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
+                if (current == null) {
+                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    return null;
+                }
+            }
+        }
+        return current;
+    }
 }
index 46ea4ff..fa488da 100644 (file)
@@ -15,6 +15,9 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -23,7 +26,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
@@ -34,6 +36,7 @@ import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
 import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
@@ -47,10 +50,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-
 public class NetconfDeviceTest {
 
     private static final NetconfMessage netconfMessage;
@@ -71,13 +70,20 @@ public class NetconfDeviceTest {
     public static final String TEST_NAMESPACE = "test:namespace";
     public static final String TEST_MODULE = "test-module";
     public static final String TEST_REVISION = "2013-07-22";
+    private NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver = new NetconfStateSchemas.NetconfStateSchemasResolver() {
+
+        @Override
+        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+            return NetconfStateSchemas.EMPTY;
+        }
+    };
 
     @Test
     public void testNetconfDeviceWithoutMonitoring() throws Exception {
         final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
-        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), getMessageTransformer(), getSchemaContextProviderFactory(), getSourceProviderFactory());
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), getMessageTransformer(), getSchemaContextProviderFactory(), getSourceProviderFactory(), stateSchemasResolver);
         device.onRemoteSessionUp(getSessionCaps(false, Collections.<String>emptyList()), listener);
 
         Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
@@ -89,7 +95,7 @@ public class NetconfDeviceTest {
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
         final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
-        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory(), stateSchemasResolver);
 
         device.onNotification(netconfMessage);
         device.onNotification(netconfMessage);
@@ -118,7 +124,7 @@ public class NetconfDeviceTest {
         final SchemaSourceProviderFactory<InputStream> sourceProviderFactory = getSourceProviderFactory();
         final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
 
-        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, schemaContextProviderFactory, sourceProviderFactory);
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, schemaContextProviderFactory, sourceProviderFactory, stateSchemasResolver);
         final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
                 Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
         device.onRemoteSessionUp(sessionCaps, listener);
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java
new file mode 100644 (file)
index 0000000..16a915e
--- /dev/null
@@ -0,0 +1,29 @@
+package org.opendaylight.controller.sal.connect.netconf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+import java.util.Set;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.w3c.dom.Document;
+
+public class NetconfStateSchemasTest {
+
+    @Test
+    public void testCreate() throws Exception {
+        final Document schemasXml = XmlUtil.readXmlToDocument(getClass().getResourceAsStream("/netconf-state.schemas.payload.xml"));
+        final CompositeNode compositeNodeSchemas = (CompositeNode) XmlDocumentUtils.toDomNode(schemasXml);
+        final NetconfStateSchemas schemas = NetconfStateSchemas.create(compositeNodeSchemas);
+
+        final Set<QName> availableYangSchemasQNames = schemas.getAvailableYangSchemasQNames();
+        assertEquals(73, availableYangSchemasQNames.size());
+
+        assertThat(availableYangSchemasQNames,
+                hasItem(QName.create("urn:TBD:params:xml:ns:yang:network-topology", "2013-07-12", "network-topology")));
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/resources/netconf-state.schemas.payload.xml b/opendaylight/md-sal/sal-netconf-connector/src/test/resources/netconf-state.schemas.payload.xml
new file mode 100644 (file)
index 0000000..649ecb7
--- /dev/null
@@ -0,0 +1,514 @@
+<ncm:schemas xmlns:ncm="urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring">
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:threadpool</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadpool</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-04-09</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:logback:config</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>config-logging</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-07-16</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:model:statistics:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-statistics-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-25</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-config-dom-datastore</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-06-17</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:table:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-flow-table-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-15</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:meter:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-meter</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:config:toaster-provider:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>toaster-provider-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-01-31</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:table:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-table-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-26</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:table:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-table</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-26</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:shutdown</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>shutdown</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:port:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-port</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-07</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:netty:eventexecutor</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>netty-event-executor</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-12</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-remote</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-01-14</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:model:topology:view</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-topology-view</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-30</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:netty:threadgroup</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadgroup</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-07</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:TBD:params:xml:ns:yang:network-topology</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>network-topology</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-07-12</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:fixed</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadpool-impl-fixed</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-01</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-sal-binding-broker-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-restconf</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-restconf</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:node:error:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>node-error</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-04-10</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:errors</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>flow-errors</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-16</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-flow</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:rpc-context</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>rpc-context</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-06-17</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store
+        </ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-operational-dom-datastore</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-06-17</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:types:queue</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-queue-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-25</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-netconf-monitoring</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2010-10-04</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:netconf-node-inventory</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>netconf-node-inventory</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-01-08</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-yang-types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-yang-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-07-15</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:meter:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-meter-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-11</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:inventory</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>flow-node-inventory</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>odl-sal-netconf-connector-cfg</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:scheduled</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadpool-impl-scheduled</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-01</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:TBD:params:xml:ns:yang:network-topology</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>network-topology</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-21</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>http://netconfcentral.org/ns/toaster</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>toaster</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2009-11-20</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:config:netconf</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>odl-netconf-cfg</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-04-08</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:meter:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-meter-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-sal-dom-broker-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:topology:discovery</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>flow-topology-discovery</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:yang:extension:yang-ext</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>yang-ext</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-07-09</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadpool-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-04-05</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:types:port</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-port-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-25</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-md-sal-binding</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:packet:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>packet-processing</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-07-09</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>threadpool-impl-flexible</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-01</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:queue:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-queue</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-07</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-inet-types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-inet-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2010-09-24</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:rest:connector</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-rest-connector</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-07-24</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:transaction</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>flow-capable-transaction</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-03</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-flow-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:protocol:framework</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>protocol-framework</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-03-13</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:model:match:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-match-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-26</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-yang-types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-yang-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2010-09-24</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:group:service</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>sal-group</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-09-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-inmemory-datastore-provider</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-06-17</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:netty:timer</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>netty-timer</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:group:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-group-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-11</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:config</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>config</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-04-05</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>odl-netconfig-client-cfg</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-04-08</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:l2:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-l2-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-27</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:action:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-action-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-12</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-md-sal-dom</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:md:sal:common</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-md-sal-common</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-28</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:group:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-group-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring-extension</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>ietf-netconf-monitoring-extension</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-10</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:inventory</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-inventory</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-08-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:netty</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>netty</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-11-19</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:model:topology:general</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-topology</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-30</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:port:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-port-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version></ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:queue:statistics</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-queue-statistics</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-16</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:config:kitchen-service:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>kitchen-service-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2014-01-31</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:flow:types</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-flow-types</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-26</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:params:xml:ns:yang:controller:shutdown:impl</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>shutdown-impl</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-12-18</ncm:version>
+    </ncm:schema>
+    <ncm:schema>
+        <ncm:namespace>urn:opendaylight:model:topology:inventory</ncm:namespace>
+        <ncm:location>NETCONF</ncm:location>
+        <ncm:identifier>opendaylight-topology-inventory</ncm:identifier>
+        <ncm:format>yang</ncm:format>
+        <ncm:version>2013-10-30</ncm:version>
+    </ncm:schema>
+</ncm:schemas>
\ No newline at end of file
index a2bee8f..674c5bf 100644 (file)
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-osgi_${scala.version}</artifactId>
     </dependency>
+
+  <dependency>
+     <groupId>com.typesafe.akka</groupId>
+     <artifactId>akka-slf4j_${scala.version}</artifactId>
+  </dependency>
     <!-- SAL Dependencies -->
 
     <dependency>
       <scope>test</scope>
     </dependency>
 
-    <dependency>
+      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>${slf4j.version}</version>
index bd49b62..f1ca3cc 100644 (file)
@@ -27,7 +27,6 @@ public class ActorSystemFactory {
    * @param bundleContext
    */
   public static final void createInstance(final BundleContext bundleContext) {
-
     if(actorSystem == null) {
       // Create an OSGi bundle classloader for actor system
       BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
@@ -35,8 +34,8 @@ public class ActorSystemFactory {
       synchronized (ActorSystemFactory.class) {
         // Double check
         if (actorSystem == null) {
-          ActorSystem system = ActorSystem.create("opendaylight-rpc",
-              ConfigFactory.load().getConfig("odl-cluster"), classLoader);
+          ActorSystem system = ActorSystem.create("opendaylight-cluster-rpc",
+              ConfigFactory.load().getConfig("odl-cluster-rpc"), classLoader);
           actorSystem = system;
         }
       }
index 5c56455..514a2f1 100644 (file)
@@ -17,7 +17,7 @@ import akka.japi.Creator;
 import akka.japi.Function;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -72,7 +72,7 @@ public class RpcManager extends AbstractUntypedActor {
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
 
-    rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
+    rpcRegistry = getContext().actorOf(RpcRegistryOld.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
     rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
   }
 
index 5e19653..c25aa52 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc.registry;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.japi.Option;
+import akka.japi.Pair;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RoutingTable<I, R> {
-
-  private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
-
-  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
-  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
-
-  public ConcurrentMap<I, R> getGlobalRpcMap() {
-    return globalRpcMap;
-  }
-
-  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
-    return routedRpcMap;
-  }
-
-  public R getGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
-    return globalRpcMap.get(routeId);
-  }
-
-  public void addGlobalRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
-    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
-    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
-      LOG.debug("A route already exist for route id [{}] ", routeId);
-    }
-  }
+import java.util.HashMap;
+import java.util.Map;
 
-  public void removeGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
-    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
-    globalRpcMap.remove(routeId);
-  }
+public class RoutingTable implements Copier<RoutingTable>, Serializable {
 
-  public Set<R> getRoutedRpc(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
-    Set<R> routes = routedRpcMap.get(routeId);
-
-    if (routes == null) {
-      return Collections.emptySet();
-    }
+    private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+    private ActorRef router;
 
-    return ImmutableSet.copyOf(routes);
-  }
+    @Override
+    public RoutingTable copy() {
+        RoutingTable copy = new RoutingTable();
+        copy.setTable(Collections.unmodifiableMap(table));
+        copy.setRouter(this.getRouter());
 
-  public R getLastAddedRoutedRpc(final I routeId) {
+        return copy;
+    }
 
-    Set<R> routes = getRoutedRpc(routeId);
+    public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        Long updatedTime = table.get(routeId);
 
-    if (routes.isEmpty()) {
-      return null;
+        if (updatedTime == null || router == null)
+            return Option.none();
+        else
+            return Option.option(new Pair<>(router, updatedTime));
     }
 
-    R route = null;
-    Iterator<R> iter = routes.iterator();
-    while (iter.hasNext()) {
-      route = iter.next();
+    public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
+        table.put(routeId, System.currentTimeMillis());
     }
 
-    return route;
-  }
-
-  public void addRoutedRpc(final I routeId, final R route)   {
-    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
-    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
-    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
-    threadSafeAdd(routeId, route);
-  }
-
-  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      addRoutedRpc(routeId, route);
+    public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        table.remove(routeId);
     }
-  }
 
-  public void removeRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
-
-    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
-    if (routes == null) {
-      return;
-    }
-    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
-    threadSafeRemove(routeId, route);
-  }
-
-  public void removeRoutes(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      removeRoute(routeId, route);
+    public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        return table.containsKey(routeId);
     }
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeAdd(final I routeId, final R route) {
 
-    for (int i=0;i<100;i++){
+    ///
+    /// Getter, Setters
+    ///
+    //TODO: Remove public
+    public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
+        return table;
+    }
 
-      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
-      updatedRoutes.add(route);
-      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
-      if (oldRoutes == null) {
-        return;
-      }
+    void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
+        this.table = table;
+    }
 
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.add(route);
+    public ActorRef getRouter() {
+        return router;
+    }
 
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
+    public void setRouter(ActorRef router) {
+        this.router = router;
     }
-    //the method did not already return means it failed to add route in 100 attempts
-    throw new IllegalStateException("Failed to add route [" + routeId + "]");
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeRemove(final I routeId, final R route) {
-    LinkedHashSet<R> updatedRoutes = null;
-    for (int i=0;i<100;i++){
-      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
-
-      // if route to be deleted is the only entry in the set then remove routeId from the cache
-      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
-        routedRpcMap.remove(routeId);
-        return;
-      }
-
-      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.remove(route);
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
 
+    @Override
+    public String toString() {
+        return "RoutingTable{" +
+                "table=" + table +
+                ", router=" + router +
+                '}';
     }
-    //the method did not already return means it failed to remove route in 100 attempts
-    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
-  }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java
new file mode 100644 (file)
index 0000000..5951776
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RoutingTableOld<I, R> {
+
+  private final Logger LOG = LoggerFactory.getLogger(RoutingTableOld.class);
+
+  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
+  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
+
+  public ConcurrentMap<I, R> getGlobalRpcMap() {
+    return globalRpcMap;
+  }
+
+  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
+    return routedRpcMap;
+  }
+
+  public R getGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+    return globalRpcMap.get(routeId);
+  }
+
+  public void addGlobalRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
+    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
+      LOG.debug("A route already exist for route id [{}] ", routeId);
+    }
+  }
+
+  public void removeGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
+    globalRpcMap.remove(routeId);
+  }
+
+  public Set<R> getRoutedRpc(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+    Set<R> routes = routedRpcMap.get(routeId);
+
+    if (routes == null) {
+      return Collections.emptySet();
+    }
+
+    return ImmutableSet.copyOf(routes);
+  }
+
+  public R getLastAddedRoutedRpc(final I routeId) {
+
+    Set<R> routes = getRoutedRpc(routeId);
+
+    if (routes.isEmpty()) {
+      return null;
+    }
+
+    R route = null;
+    Iterator<R> iter = routes.iterator();
+    while (iter.hasNext()) {
+      route = iter.next();
+    }
+
+    return route;
+  }
+
+  public void addRoutedRpc(final I routeId, final R route)   {
+    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+    threadSafeAdd(routeId, route);
+  }
+
+  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      addRoutedRpc(routeId, route);
+    }
+  }
+
+  public void removeRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
+    if (routes == null) {
+      return;
+    }
+    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
+    threadSafeRemove(routeId, route);
+  }
+
+  public void removeRoutes(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      removeRoute(routeId, route);
+    }
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeAdd(final I routeId, final R route) {
+
+    for (int i=0;i<100;i++){
+
+      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+      updatedRoutes.add(route);
+      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
+      if (oldRoutes == null) {
+        return;
+      }
+
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.add(route);
+
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+    }
+    //the method did not already return means it failed to add route in 100 attempts
+    throw new IllegalStateException("Failed to add route [" + routeId + "]");
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeRemove(final I routeId, final R route) {
+    LinkedHashSet<R> updatedRoutes = null;
+    for (int i=0;i<100;i++){
+      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
+
+      // if route to be deleted is the only entry in the set then remove routeId from the cache
+      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+        routedRpcMap.remove(routeId);
+        return;
+      }
+
+      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.remove(route);
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+
+    }
+    //the method did not already return means it failed to remove route in 100 attempts
+    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+  }
+}
index e36060c..5160987 100644 (file)
  */
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import akka.actor.UntypedActor;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Option;
+import akka.japi.Pair;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.concurrent.Future;
 
-import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
 
 /**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
+ * Registry to look up cluster nodes that have registered for a given rpc.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
  *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
  */
+public class RpcRegistry extends UntypedActor {
+
+    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+    /**
+     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
+     */
+    private ActorRef bucketStore;
+
+    /**
+     * Rpc broker that would use the registry to route requests.
+     */
+    private ActorRef localRouter;
 
-public class RpcRegistry extends AbstractUntypedActor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
-  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
-  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-  private final ClusterWrapper clusterWrapper;
-  private final ScheduledFuture<?> syncScheduler;
-
-  private RpcRegistry(ClusterWrapper clusterWrapper){
-    this.routingTable = new RoutingTable<>();
-    this.clusterWrapper = clusterWrapper;
-    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
-  }
-
-  public static Props props(final ClusterWrapper clusterWrapper){
-    return Props.create(new Creator<RpcRegistry>(){
-
-      @Override
-      public RpcRegistry create() throws Exception {
-        return new RpcRegistry(clusterWrapper);
-      }
-    });
-  }
-
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    LOG.debug("Received message {}", message);
-    if(message instanceof RoutingTableData) {
-      syncRoutingTable((RoutingTableData) message);
-    } else if(message instanceof GetRoutedRpc) {
-      getRoutedRpc((GetRoutedRpc) message);
-    } else if(message instanceof GetRpc) {
-      getRpc((GetRpc) message);
-    } else if(message instanceof AddRpc) {
-      addRpc((AddRpc) message);
-    } else if(message instanceof RemoveRpc) {
-      removeRpc((RemoveRpc) message);
-    } else if(message instanceof AddRoutedRpc) {
-      addRoutedRpc((AddRoutedRpc) message);
-    } else if(message instanceof RemoveRoutedRpc) {
-      removeRoutedRpc((RemoveRoutedRpc) message);
+    public RpcRegistry() {
+        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
     }
-  }
 
-  private void getRoutedRpc(GetRoutedRpc rpcMsg){
-    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
-    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+    public RpcRegistry(ActorRef bucketStore) {
+        this.bucketStore = bucketStore;
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
 
-    getSender().tell(routedRpcReply, self());
-  }
+        log.debug("Received message: message [{}]", message);
 
-  private void getRpc(GetRpc rpcMsg) {
-    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
-    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+        //TODO: if sender is remote, reject message
 
-    getSender().tell(rpcReply, self());
-  }
+        if (message instanceof SetLocalRouter)
+            receiveSetLocalRouter((SetLocalRouter) message);
 
-  private void addRpc(AddRpc rpcMsg) {
-    LOG.debug("Add Rpc to routing table {}", rpcMsg);
-    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+        if (message instanceof AddOrUpdateRoute)
+            receiveAddRoute((AddOrUpdateRoute) message);
 
-    getSender().tell("Success", self());
-  }
+        else if (message instanceof RemoveRoute)
+            receiveRemoveRoute((RemoveRoute) message);
 
-  private void removeRpc(RemoveRpc rpcMsg) {
-    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
-    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+        else if (message instanceof Messages.FindRouters)
+            receiveGetRouter((Messages.FindRouters) message);
+
+        else
+            unhandled(message);
+    }
 
-    getSender().tell("Success", self());
-  }
+    /**
+     * Register's rpc broker
+     *
+     * @param message contains {@link akka.actor.ActorRef} for rpc broker
+     */
+    private void receiveSetLocalRouter(SetLocalRouter message) {
+        if (message == null || message.getRouter() == null)
+            return;//ignore
 
-  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
-    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+        localRouter = message.getRouter();
+    }
 
-  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
-    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+    /**
+     * //TODO: update this to accept multiple route registration
+     * @param msg
+     */
+    private void receiveAddRoute(AddOrUpdateRoute msg) {
+        if (msg.getRouteIdentifier() == null)
+            return;//ignore
 
-  private void syncRoutingTable(RoutingTableData routingTableData) {
-    LOG.debug("Syncing routing table {}", routingTableData);
+        Preconditions.checkState(localRouter != null, "Router must be set first");
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
-    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
     }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
-        routingTableData.getRoutedRpcMap();
-    routeIds = newRoutedRpcMap.keySet();
+    /**
+     * //TODO: update this to accept multiple routes
+     * @param msg
+     */
+    private void receiveRemoveRoute(RemoveRoute msg) {
+        if (msg.getRouteIdentifier() == null)
+            return;//ignore
+
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
 
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
-      for(String routeAddress : routeAddresses) {
-        routingTable.addRoutedRpc(routeId, routeAddress);
-      }
     }
-  }
-
-  private ActorSelection getRandomRegistryActor() {
-    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
-    ActorSelection actor = null;
-    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
-    int memberSize = members.size();
-    // Don't select yourself
-    if(memberSize > 1) {
-      Address currentNodeAddress = clusterWrapper.getAddress();
-      int index = new Random().nextInt(memberSize);
-      int i = 0;
-      // keeping previous member, in case when random index member is same as current actor
-      // and current actor member is last in set
-      Member previousMember = null;
-      for(Member member : members){
-        if(i == index-1) {
-          previousMember = member;
+
+    /**
+     * Finds routers for the given rpc.
+     * @param msg
+     */
+    private void receiveGetRouter(Messages.FindRouters msg) {
+        final ActorRef sender = getSender();
+
+        //if empty message, return empty list
+        if (msg.getRouteIdentifier() == null) {
+            sender.tell(createEmptyReply(), getSelf());
+            return;
         }
-        if(i == index) {
-          if(!currentNodeAddress.equals(member.address())) {
-            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
-            break;
-          } else if(index < memberSize-1){ // pick the next element in the set
-            index++;
-          }
+
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+
+    }
+
+    /**
+     * Helper to create empty reply when no routers are found
+     *
+     * @return
+     */
+    private Messages.FindRoutersReply createEmptyReply() {
+        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
+        return new Messages.FindRoutersReply(routerWithUpdateTime);
+    }
+
+    /**
+     * Helper to create a reply when routers are found for the given rpc
+     * @param buckets
+     * @param routeId
+     * @return
+     */
+    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
+
+        for (Bucket bucket : buckets.values()) {
+
+            RoutingTable table = (RoutingTable) bucket.getData();
+
+            if (table == null)
+                continue;
+
+            routerWithUpdateTime = table.getRouterFor(routeId);
+
+            if (routerWithUpdateTime.isEmpty())
+                continue;
+
+            routers.add(routerWithUpdateTime.get());
         }
-        i++;
-      }
-      if(actor == null && previousMember != null) {
-        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
-      }
+
+        return new Messages.FindRoutersReply(routers);
     }
-    return actor;
-  }
 
-  private class SendRoutingTable implements Runnable {
 
-    @Override
-    public void run() {
-      RoutingTableData routingTableData =
-          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
-      LOG.debug("Sending routing table for sync {}", routingTableData);
-      ActorSelection actor = getRandomRegistryActor();
-      if(actor != null) {
-        actor.tell(routingTableData, self());
-      }
+    ///
+    ///private factories to create Mapper
+    ///
+
+    /**
+     *  Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+     *
+     * @param routeId the rpc
+     * @param sender  client who asked to find the routers.
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+
+                if (replyMessage instanceof GetAllBucketsReply) {
+
+                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
+                    Map<Address, Bucket> buckets = reply.getBuckets();
+
+                    if (buckets == null || buckets.isEmpty()) {
+                        sender.tell(createEmptyReply(), getSelf());
+                        return null;
+                    }
+
+                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
+     * it updates the local bucket in bucket store.
+     *
+     * @param routeId rpc to remote
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetLocalBucketReply) {
+
+                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+                    Bucket<RoutingTable> bucket = reply.getBucket();
+
+                    if (bucket == null) {
+                        log.debug("Local bucket is null");
+                        return null;
+                    }
+
+                    RoutingTable table = bucket.getData();
+                    if (table == null)
+                        table = new RoutingTable();
+
+                    table.setRouter(localRouter);
+                    table.removeRoute(routeId);
+
+                    bucket.setData(table);
+
+                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+                    bucketStore.tell(updateBucketMessage, getSelf());
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
+     * it updates the local bucket in bucket store.
+     *
+     * @param routeId rpc to add
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetLocalBucketReply) {
+
+                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+                    Bucket<RoutingTable> bucket = reply.getBucket();
+
+                    if (bucket == null) {
+                        log.debug("Local bucket is null");
+                        return null;
+                    }
+
+                    RoutingTable table = bucket.getData();
+                    if (table == null)
+                        table = new RoutingTable();
+
+                    table.setRouter(localRouter);
+                    table.addRoute(routeId);
+
+                    bucket.setData(table);
+
+                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+                    bucketStore.tell(updateBucketMessage, getSelf());
+                }
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * All messages used by the RpcRegistry
+     */
+    public static class Messages {
+
+
+        public static class ContainsRoute {
+            final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+
+            public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                Preconditions.checkArgument(routeIdentifier != null);
+                this.routeIdentifier = routeIdentifier;
+            }
+
+            public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
+                return this.routeIdentifier;
+            }
+
+            @Override
+            public String toString() {
+                return this.getClass().getSimpleName() + "{" +
+                        "routeIdentifier=" + routeIdentifier +
+                        '}';
+            }
+        }
+
+        public static class AddOrUpdateRoute extends ContainsRoute{
+
+            public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class RemoveRoute extends ContainsRoute {
+
+            public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class SetLocalRouter{
+            private final ActorRef router;
+
+            public SetLocalRouter(ActorRef router) {
+                this.router = router;
+            }
+
+            public ActorRef getRouter(){
+                return this.router;
+            }
+
+            @Override
+            public String toString() {
+                return "SetLocalRouter{" +
+                        "router=" + router +
+                        '}';
+            }
+        }
+
+        public static class FindRouters extends ContainsRoute {
+            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class FindRoutersReply {
+            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+                this.routerWithUpdateTime = routerWithUpdateTime;
+            }
+
+            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+                return routerWithUpdateTime;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRoutersReply{" +
+                        "routerWithUpdateTime=" + routerWithUpdateTime +
+                        '}';
+            }
+        }
     }
-  }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java
new file mode 100644 (file)
index 0000000..96c8802
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.ActorConstants;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ *
+ * A scheduler runs after an interval of time, which pick a random member from the cluster
+ * and send the current state of routing table to the member.
+ *
+ * when a message of routing table data is received, it gets merged with the local routing table
+ * to keep the latest data.
+ */
+
+public class RpcRegistryOld extends AbstractUntypedActor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class);
+  private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
+  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+  private final ClusterWrapper clusterWrapper;
+  private final ScheduledFuture<?> syncScheduler;
+
+  private RpcRegistryOld(ClusterWrapper clusterWrapper){
+    this.routingTable = new RoutingTableOld<>();
+    this.clusterWrapper = clusterWrapper;
+    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
+  }
+
+  public static Props props(final ClusterWrapper clusterWrapper){
+    return Props.create(new Creator<RpcRegistryOld>(){
+
+      @Override
+      public RpcRegistryOld create() throws Exception {
+        return new RpcRegistryOld(clusterWrapper);
+      }
+    });
+  }
+
+  @Override
+  protected void handleReceive(Object message) throws Exception {
+    LOG.debug("Received message {}", message);
+    if(message instanceof RoutingTableData) {
+      syncRoutingTable((RoutingTableData) message);
+    } else if(message instanceof GetRoutedRpc) {
+      getRoutedRpc((GetRoutedRpc) message);
+    } else if(message instanceof GetRpc) {
+      getRpc((GetRpc) message);
+    } else if(message instanceof AddRpc) {
+      addRpc((AddRpc) message);
+    } else if(message instanceof RemoveRpc) {
+      removeRpc((RemoveRpc) message);
+    } else if(message instanceof AddRoutedRpc) {
+      addRoutedRpc((AddRoutedRpc) message);
+    } else if(message instanceof RemoveRoutedRpc) {
+      removeRoutedRpc((RemoveRoutedRpc) message);
+    }
+  }
+
+  private void getRoutedRpc(GetRoutedRpc rpcMsg){
+    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
+    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+
+    getSender().tell(routedRpcReply, self());
+  }
+
+  private void getRpc(GetRpc rpcMsg) {
+    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
+    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+
+    getSender().tell(rpcReply, self());
+  }
+
+  private void addRpc(AddRpc rpcMsg) {
+    LOG.debug("Add Rpc to routing table {}", rpcMsg);
+    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+
+    getSender().tell("Success", self());
+  }
+
+  private void removeRpc(RemoveRpc rpcMsg) {
+    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
+    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+
+    getSender().tell("Success", self());
+  }
+
+  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
+    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
+    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void syncRoutingTable(RoutingTableData routingTableData) {
+    LOG.debug("Syncing routing table {}", routingTableData);
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
+    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+    }
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
+        routingTableData.getRoutedRpcMap();
+    routeIds = newRoutedRpcMap.keySet();
+
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
+      for(String routeAddress : routeAddresses) {
+        routingTable.addRoutedRpc(routeId, routeAddress);
+      }
+    }
+  }
+
+  private ActorSelection getRandomRegistryActor() {
+    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
+    ActorSelection actor = null;
+    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
+    int memberSize = members.size();
+    // Don't select yourself
+    if(memberSize > 1) {
+      Address currentNodeAddress = clusterWrapper.getAddress();
+      int index = new Random().nextInt(memberSize);
+      int i = 0;
+      // keeping previous member, in case when random index member is same as current actor
+      // and current actor member is last in set
+      Member previousMember = null;
+      for(Member member : members){
+        if(i == index-1) {
+          previousMember = member;
+        }
+        if(i == index) {
+          if(!currentNodeAddress.equals(member.address())) {
+            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
+            break;
+          } else if(index < memberSize-1){ // pick the next element in the set
+            index++;
+          }
+        }
+        i++;
+      }
+      if(actor == null && previousMember != null) {
+        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
+      }
+    }
+    return actor;
+  }
+
+  private class SendRoutingTable implements Runnable {
+
+    @Override
+    public void run() {
+      RoutingTableData routingTableData =
+          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
+      LOG.debug("Sending routing table for sync {}", routingTableData);
+      ActorSelection actor = getRandomRegistryActor();
+      if(actor != null) {
+        actor.tell(routingTableData, self());
+      }
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
new file mode 100644 (file)
index 0000000..f5dfbc5
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+
+public interface Bucket<T extends Copier<T>> {
+    public Long getVersion();
+    public T getData();
+    public void setData(T data);
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
new file mode 100644 (file)
index 0000000..3cdd924
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import java.io.Serializable;
+
+public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+
+    private Long version = System.currentTimeMillis();;
+
+    private T data;
+
+    @Override
+    public Long getVersion() {
+        return version;
+    }
+
+    @Override
+    public T getData() {
+        if (this.data == null)
+            return null;
+
+        return data.copy();
+    }
+
+    public void setData(T data){
+        this.version = System.currentTimeMillis()+1;
+        this.data = data;
+    }
+
+    @Override
+    public String toString() {
+        return "BucketImpl{" +
+                "version=" + version +
+                ", data=" + data +
+                '}';
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
new file mode 100644 (file)
index 0000000..2f634ce
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+
+/**
+ * A store that syncs its data across nodes in the cluster.
+ * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
+ * A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ * <p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
+ *
+ */
+public class BucketStore extends UntypedActor {
+
+    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+    /**
+     * Bucket owned by the node
+     */
+    private BucketImpl localBucket = new BucketImpl();;
+
+    /**
+     * Buckets ownded by other known nodes in the cluster
+     */
+    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+
+    /**
+     * Bucket version for every known node in the cluster including this node
+     */
+    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+
+    /**
+     * Cluster address for this node
+     */
+    private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
+
+    /**
+     * Our private gossiper
+     */
+    private ActorRef gossiper;
+
+    public BucketStore(){
+        gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+    }
+
+    /**
+     * This constructor is useful for testing.
+     * TODO: Pass Props instead of ActorRef
+     *
+     * @param gossiper
+     */
+    public BucketStore(ActorRef gossiper){
+        this.gossiper = gossiper;
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+
+        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+
+        if (message instanceof UpdateBucket)
+            receiveUpdateBucket(((UpdateBucket) message).getBucket());
+
+        else if (message instanceof GetAllBuckets)
+            receiveGetAllBucket();
+
+        else if (message instanceof GetLocalBucket)
+            receiveGetLocalBucket();
+
+        else if (message instanceof GetBucketsByMembers)
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
+
+        else if (message instanceof GetBucketVersions)
+            receiveGetBucketVersions();
+
+        else if (message instanceof UpdateRemoteBuckets)
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
+
+        else {
+            log.debug("Unhandled message [{}]", message);
+            unhandled(message);
+        }
+
+    }
+
+    /**
+     * Returns a copy of bucket owned by this node
+     */
+    private void receiveGetLocalBucket() {
+        final ActorRef sender = getSender();
+        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
+        sender.tell(reply, getSelf());
+    }
+
+    /**
+     * Updates the bucket owned by this node
+     *
+     * @param updatedBucket
+     */
+    void receiveUpdateBucket(Bucket updatedBucket){
+
+        localBucket = (BucketImpl) updatedBucket;
+        versions.put(selfAddress, localBucket.getVersion());
+    }
+
+    /**
+     * Returns all the buckets the this node knows about, self owned + remote
+     */
+    void receiveGetAllBucket(){
+        final ActorRef sender = getSender();
+        sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+    }
+
+    /**
+     * Helper to collect all known buckets
+     *
+     * @return self owned + remote buckets
+     */
+    Map<Address, Bucket> getAllBuckets(){
+        Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+
+        //first add the local bucket
+        all.put(selfAddress, localBucket);
+
+        //then get all remote buckets
+        all.putAll(remoteBuckets);
+
+        return all;
+    }
+
+    /**
+     * Returns buckets for requested members that this node knows about
+     *
+     * @param members requested members
+     */
+    void receiveGetBucketsByMembers(Set<Address> members){
+        final ActorRef sender = getSender();
+        Map<Address, Bucket> buckets = getBucketsByMembers(members);
+        sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+    }
+
+    /**
+     * Helper to collect buckets for requested memebers
+     *
+     * @param members requested members