Merge "Add MD-SAL artifacts"
authorTony Tkacik <ttkacik@cisco.com>
Wed, 3 Dec 2014 10:27:44 +0000 (10:27 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 3 Dec 2014 10:27:45 +0000 (10:27 +0000)
140 files changed:
features/extras/pom.xml [new file with mode: 0644]
features/extras/src/main/resources/features.xml [new file with mode: 0644]
features/flow/pom.xml
features/flow/src/main/resources/features.xml
features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/controller/config/yang/protocol/framework/NeverReconnectStrategyModuleTest.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/controller/config/yang/protocol/framework/ReconnectImmediatelyStrategyModuleTest.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/controller/config/yang/protocol/framework/TimedReconnectStrategyModuleTest.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/testingservices/parallelapsp/test/DependentWiringTest.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/testingservices/scheduledthreadpool/test/TwoInterfacesExportTest.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/testingservices/threadpool/test/SimpleConfigurationTest.java
opendaylight/config/logback-config/src/main/java/org/opendaylight/controller/config/yang/logback/config/ContextSetterImpl.java
opendaylight/config/logback-config/src/main/java/org/opendaylight/controller/config/yang/logback/config/LogbackModule.java
opendaylight/config/logback-config/src/main/java/org/opendaylight/controller/config/yang/logback/config/LogbackStatusListener.java
opendaylight/config/logback-config/src/test/java/org/opendaylight/controller/config/yang/logback/config/ContextSetterImplTest.java
opendaylight/config/logback-config/src/test/java/org/opendaylight/controller/config/yang/logback/config/LogbackModuleTest.java
opendaylight/config/logback-config/src/test/java/org/opendaylight/controller/config/yang/logback/config/LogbackModuleWithInitialConfigurationTest.java
opendaylight/config/logback-config/src/test/java/org/opendaylight/controller/config/yang/logback/config/LogbackWithXmlConfigModuleTest.java
opendaylight/config/netty-event-executor-config/src/main/java/org/opendaylight/controller/config/yang/netty/eventexecutor/AutoCloseableEventExecutor.java
opendaylight/config/netty-event-executor-config/src/main/java/org/opendaylight/controller/config/yang/netty/eventexecutor/GlobalEventExecutorModuleFactory.java
opendaylight/config/netty-event-executor-config/src/main/java/org/opendaylight/controller/config/yang/netty/eventexecutor/ImmediateEventExecutorModuleFactory.java
opendaylight/config/netty-event-executor-config/src/test/java/org/opendaylight/controller/config/yang/netty/eventexecutor/GlobalEventExecutorModuleTest.java
opendaylight/config/netty-event-executor-config/src/test/java/org/opendaylight/controller/config/yang/netty/eventexecutor/ImmediateEventExecutorModuleTest.java
opendaylight/config/netty-threadgroup-config/src/main/java/org/opendaylight/controller/config/yang/netty/threadgroup/NettyThreadgroupModule.java
opendaylight/config/netty-threadgroup-config/src/test/java/org/opendaylight/controller/config/yang/netty/threadgroup/NettyThreadgroupModuleTest.java
opendaylight/config/pom.xml
opendaylight/distribution/opendaylight-karaf/pom.xml
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-binding-broker/pom.xml
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/DefaultRuntimeCodeGenerator.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RuntimeCodeGenerator.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/DefaultRuntimeCodeGeneratorTest.java [moved from opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/RuntimeCodeGeneratorTest.java with 97% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/statistics-manager-config/pom.xml [new file with mode: 0644]
opendaylight/md-sal/statistics-manager-config/src/main/resources/initial/30-statistics-manager.xml [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/pom.xml
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/config/yang/md/sal/statistics_manager/StatisticsManagerModule.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/config/yang/md/sal/statistics_manager/StatisticsManagerModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java [deleted file]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerConfig.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java
opendaylight/md-sal/statistics-manager/src/main/yang/statistics-manager.yang [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/test/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerProvider.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/FlowStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/GroupStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/MeterStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/NodeRegistrationTest.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/PortStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/QueueStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/StatCollectorTest.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/TableStatisticsTest.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightFlowStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightFlowTableStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightGroupStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightMeterStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightPortStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/OpendaylightQueueStatisticsServiceMock.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/ProviderContextMock.java [deleted file]
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/StatisticsManagerTest.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/TestUtils.java
opendaylight/md-sal/topology-lldp-discovery/pom.xml
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/Main.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/NetconfUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/exception/MissingNameSpaceException.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/exception/UnexpectedElementException.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/exception/UnexpectedNamespaceException.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessageAdditionalHeader.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageHeader.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/HardcodedNamespaceResolver.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XMLNetconfUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlElement.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlNetconfValidator.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlUtil.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/NetconfUtilTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractLastNetconfOperationTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractNetconfOperationTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperationTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessageAdditionalHeaderTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessageTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageHeaderTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtilTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtilTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/test/XmlFileLoader.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/xml/XmlElementTest.java
opendaylight/netconf/netconf-util/src/test/resources/netconfMessages/startExi.xml
opendaylight/netconf/netconf-util/src/test/resources/netconfMessages/stopExi.xml

diff --git a/features/extras/pom.xml b/features/extras/pom.xml
new file mode 100644 (file)
index 0000000..20089c7
--- /dev/null
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>commons.opendaylight</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+        <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+    </parent>
+    <artifactId>features-extras</artifactId>
+    <groupId>org.opendaylight.controller</groupId>
+    <packaging>jar</packaging>
+    <properties>
+        <features.file>features.xml</features.file>
+        <!-- Optional TODO: Move these properties to your parent pom and possibly
+              DependencyManagement section of your parent pom -->
+        <branding.version>1.1.0-SNAPSHOT</branding.version>
+        <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
+        <karaf.version>3.0.1</karaf.version>
+        <feature.test.version>0.7.0-SNAPSHOT</feature.test.version>
+        <karaf.empty.version>1.5.0-SNAPSHOT</karaf.empty.version>
+        <surefire.version>2.16</surefire.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.jolokia</groupId>
+            <artifactId>jolokia-osgi</artifactId>
+            <version>${jolokia.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>features-test</artifactId>
+            <version>${feature.test.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- dependency for opendaylight-karaf-empty for use by testing -->
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>opendaylight-karaf-empty</artifactId>
+            <version>${karaf.empty.version}</version>
+            <type>zip</type>
+        </dependency>
+    </dependencies>
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>filter</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>resources</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${project.build.directory}/classes/${features.file}</file>
+                                    <type>xml</type>
+                                    <classifier>features</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${surefire.version}</version>
+                <configuration>
+                    <systemPropertyVariables>
+                        <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+                        <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+                        <karaf.distro.version>${karaf.empty.version}</karaf.distro.version>
+                    </systemPropertyVariables>
+                    <dependenciesToScan>
+                        <dependency>org.opendaylight.yangtools:features-test</dependency>
+                    </dependenciesToScan>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <scm>
+        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <tag>HEAD</tag>
+        <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
+    </scm>
+</project>
diff --git a/features/extras/src/main/resources/features.xml b/features/extras/src/main/resources/features.xml
new file mode 100644 (file)
index 0000000..e7423ea
--- /dev/null
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+
+<!--
+
+ This feature file is intended to contain only third party features that cannot be accommodated in any
+ other feature file. This is a good place to add features like jolokia which no other feature depends on
+ but which provides a utility.
+-->
+<features name="odl-extras-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+    <feature name='odl-extras-all' version='${project.version}' description='OpenDaylight :: Extras :: All'>
+        <feature version="${project.version}">odl-jolokia</feature>
+    </feature>
+    <feature name="odl-jolokia" version="${project.version}" description="Jolokia JMX/HTTP bridge">
+        <feature>http</feature>
+        <bundle>mvn:org.jolokia/jolokia-osgi/${jolokia.version}</bundle>
+    </feature>
+</features>
index ab30b0b..53b45d2 100644 (file)
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
+
+  <dependency>
+      <groupId>org.opendaylight.controller.md</groupId>
+      <artifactId>statistics-manager-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+  </dependency>
+
     <dependency>
       <groupId>org.opendaylight.controller.model</groupId>
       <artifactId>model-flow-base</artifactId>
index 0540458..cf54e8b 100644 (file)
@@ -21,6 +21,7 @@
         <bundle>mvn:org.opendaylight.controller.md/inventory-manager/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller.md/forwardingrules-manager/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/liblldp/${sal.version}</bundle>
+        <configfile finalname="${config.configfile.directory}/${config.statistics.manager.configfile}">mvn:org.opendaylight.controller.md/statistics-manager-config/${mdsal.version}/xml/config</configfile>
     </feature>
 
 </features>
index ba938c6..8be9f55 100644 (file)
@@ -23,5 +23,6 @@
     <module>akka</module>
     <module>netconf-connector</module>
     <module>restconf</module>
+    <module>extras</module>
   </modules>
-</project>
\ No newline at end of file
+</project>
index 135b013..8596412 100644 (file)
@@ -87,6 +87,7 @@
     <controllermanager.northbound.version>0.1.0-SNAPSHOT</controllermanager.northbound.version>
     <devices.web.version>0.5.0-SNAPSHOT</devices.web.version>
     <dummy-console.version>1.2.0-SNAPSHOT</dummy-console.version>
+    <config.statistics.manager.configfile>30-statistics-manager.xml</config.statistics.manager.configfile>
     <eclipse.persistence.version>2.5.0</eclipse.persistence.version>
     <eclipse.jdt.core.compiler.batch.version>3.8.0.I20120518-2145</eclipse.jdt.core.compiler.batch.version>
     <!-- enforcer version -->
     <usermanager.version>0.5.0-SNAPSHOT</usermanager.version>
     <nsf.version>0.5.0-SNAPSHOT</nsf.version>
     <web.version>0.5.0-SNAPSHOT</web.version>
-    <xtend.dstdir>src/main/xtend-gen</xtend.dstdir>
     <yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
     <yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
     <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
       </dependency>
 
       <!-- md-sal -->
-      <dependency>
-        <groupId>org.eclipse.xtend</groupId>
-        <artifactId>org.eclipse.xtend.lib</artifactId>
-        <version>${xtend.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.glassfish.jersey.test-framework.providers</groupId>
         <artifactId>jersey-test-framework-provider-grizzly2</artifactId>
           <artifactId>maven-clean-plugin</artifactId>
           <configuration>
             <filesets>
-              <fileset>
-                <directory>${xtend.dstdir}</directory>
-                <includes>
-                  <include>**</include>
-                </includes>
-              </fileset>
               <fileset>
                 <directory>${jmxGeneratorPath}</directory>
                 <includes>
                   <source>src/main/yang</source>
                   <source>${jmxGeneratorPath}</source>
                   <source>${salGeneratorPath}</source>
-                  <source>${xtend.dstdir}</source>
                 </sources>
               </configuration>
             </execution>
             </lifecycleMappingMetadata>
           </configuration>
         </plugin>
-        <plugin>
-          <groupId>org.eclipse.xtend</groupId>
-          <artifactId>xtend-maven-plugin</artifactId>
-          <version>${xtend.version}</version>
-          <executions>
-            <execution>
-              <goals>
-                <goal>compile</goal>
-              </goals>
-              <configuration>
-                <outputDirectory>${xtend.dstdir}</outputDirectory>
-              </configuration>
-            </execution>
-          </executions>
-        </plugin>
         <plugin>
           <groupId>org.jacoco</groupId>
           <artifactId>jacoco-maven-plugin</artifactId>
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <sourceDirectory>${project.basedir}</sourceDirectory>
           <includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat</includes>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/xtend-gen\/,**\/protobuff\/</excludes>
+          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/protobuff\/</excludes>
         </configuration>
         <dependencies>
           <dependency>
index 1148c6b..77589ed 100644 (file)
@@ -76,7 +76,7 @@ public class NeverReconnectStrategyModuleTest extends AbstractConfigTest {
         createInstance();
         final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
         assertBeanCount(1, FACTORY_NAME);
-        final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+        final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
                 transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME), NeverReconnectStrategyFactoryModuleMXBean.class);
         mxBean.setTimeout(200);
         final CommitStatus status = transaction.commit();
@@ -98,7 +98,7 @@ public class NeverReconnectStrategyModuleTest extends AbstractConfigTest {
     private static ObjectName createInstance(final ConfigTransactionJMXClient transaction, final Integer timeout)
             throws InstanceAlreadyExistsException {
         final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, INSTANCE_NAME);
-        final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+        final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
                 NeverReconnectStrategyFactoryModuleMXBean.class);
         mxBean.setTimeout(timeout);
         mxBean.setExecutor(GlobalEventExecutorUtil.create(transaction));
index cfdf3bf..7e3ed5b 100644 (file)
@@ -75,7 +75,7 @@ public class ReconnectImmediatelyStrategyModuleTest extends AbstractConfigTest {
         createInstance();
         final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
         assertBeanCount(1, FACTORY_NAME);
-        final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+        final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
                 transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME),
                 ReconnectImmediatelyStrategyFactoryModuleMXBean.class);
         mxBean.setReconnectTimeout(200);
@@ -97,7 +97,7 @@ public class ReconnectImmediatelyStrategyModuleTest extends AbstractConfigTest {
     private static ObjectName createInstance(final ConfigTransactionJMXClient transaction, final Integer timeout)
             throws InstanceAlreadyExistsException {
         final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, INSTANCE_NAME);
-        final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+        final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
                 ReconnectImmediatelyStrategyFactoryModuleMXBean.class);
         mxBean.setReconnectTimeout(timeout);
         mxBean.setReconnectExecutor(GlobalEventExecutorUtil.create(transaction));
index 1c068a9..ec8a9d6 100644 (file)
@@ -117,7 +117,7 @@ public class TimedReconnectStrategyModuleTest extends AbstractConfigTest {
         createInstance();
         final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
         assertBeanCount(1, FACTORY_NAME);
-        final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+        final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
                 transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME), TimedReconnectStrategyFactoryModuleMXBean.class);
         assertEquals(mxBean.getMinSleep(), new Long(100));
         mxBean.setMinSleep(200L);
@@ -149,7 +149,7 @@ public class TimedReconnectStrategyModuleTest extends AbstractConfigTest {
             final Integer connectTime, final Long minSleep, final BigDecimal sleepFactor, final Long maxSleep,
             final Long maxAttempts, final Long deadline) throws InstanceAlreadyExistsException {
         final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, instanceName);
-        final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+        final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
                 TimedReconnectStrategyFactoryModuleMXBean.class);
         mxBean.setConnectTime(connectTime);
         mxBean.setDeadline(deadline);
index 598e6bf..fd8c8d1 100644 (file)
@@ -108,7 +108,7 @@ public class DependentWiringTest extends AbstractParallelAPSPTest {
 
         // test reported apsp number of threads
         TestingParallelAPSPConfigMXBean parallelAPSPRuntimeProxy = configRegistryClient
-                .newMBeanProxy(apspON, TestingParallelAPSPConfigMXBean.class);
+                .newMXBeanProxy(apspON, TestingParallelAPSPConfigMXBean.class);
         assertEquals(
                 (Integer) TestingParallelAPSPImpl.MINIMAL_NUMBER_OF_THREADS,
                 parallelAPSPRuntimeProxy.getMaxNumberOfThreads());
index fcefd06..0e6a54f 100644 (file)
@@ -15,10 +15,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import javax.annotation.Nullable;
-import javax.management.DynamicMBean;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.ValidationException;
 import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
@@ -37,20 +38,17 @@ public class TwoInterfacesExportTest extends AbstractScheduledTest {
 
     private void assertExists(@Nullable final ConfigTransactionJMXClient transaction,
             final String moduleName, final String instanceName)
-            throws InstanceNotFoundException {
+            throws InstanceNotFoundException, IntrospectionException, ReflectionException {
         if (transaction != null) {
             transaction.lookupConfigBean(moduleName, instanceName);
             // make a dummy call
-            configRegistryClient.newMBeanProxy(
-                    ObjectNameUtil.createTransactionModuleON(
-                            transaction.getTransactionName(), moduleName,
-                            instanceName), DynamicMBean.class).getMBeanInfo();
+            platformMBeanServer.getMBeanInfo(ObjectNameUtil.createTransactionModuleON(
+                    transaction.getTransactionName(), moduleName, instanceName));
         } else {
             configRegistryClient.lookupConfigBean(moduleName, instanceName);
             // make a dummy call
-            configRegistryClient.newMBeanProxy(
-                    ObjectNameUtil.createReadOnlyModuleON(moduleName,
-                            instanceName), DynamicMBean.class).getMBeanInfo();
+            platformMBeanServer.getMBeanInfo(ObjectNameUtil.createReadOnlyModuleON(moduleName,
+                    instanceName));
         }
     }
 
@@ -173,7 +171,7 @@ public class TwoInterfacesExportTest extends AbstractScheduledTest {
 
         ObjectName apspName = transaction.createModule(
                 TestingParallelAPSPModuleFactory.NAME, "apsp1");
-        TestingParallelAPSPConfigMXBean apspProxy = transaction.newMBeanProxy(
+        TestingParallelAPSPConfigMXBean apspProxy = transaction.newMXBeanProxy(
                 apspName, TestingParallelAPSPConfigMXBean.class);
         apspProxy.setThreadPool(scheduledName);
         apspProxy.setSomeParam("someParam");
@@ -190,7 +188,7 @@ public class TwoInterfacesExportTest extends AbstractScheduledTest {
 
         ObjectName apspName = transaction.createModule(
                 TestingParallelAPSPModuleFactory.NAME, "apsp1");
-        TestingParallelAPSPConfigMXBean apspProxy = transaction.newMBeanProxy(
+        TestingParallelAPSPConfigMXBean apspProxy = transaction.newMXBeanProxy(
                 apspName, TestingParallelAPSPConfigMXBean.class);
         apspProxy.setThreadPool(ObjectNameUtil.createReadOnlyModuleON(
                 TestingScheduledThreadPoolModuleFactory.NAME, scheduled1));
index 4ed1003..c20d3bf 100644 (file)
@@ -212,11 +212,9 @@ public class SimpleConfigurationTest extends AbstractConfigTest {
         ObjectName fixed1name = firstCommit();
 
         // 2, check that configuration was copied to platform
-        DynamicMBean dynamicMBean = configRegistryClient.newMBeanProxy(
-                ObjectNameUtil.withoutTransactionName(fixed1name),
-                DynamicMBean.class);
-        dynamicMBean.getMBeanInfo();
-        assertEquals(numberOfThreads, dynamicMBean.getAttribute("ThreadCount"));
+        ObjectName on = ObjectNameUtil.withoutTransactionName(fixed1name);
+        platformMBeanServer.getMBeanInfo(on);
+        assertEquals(numberOfThreads, platformMBeanServer.getAttribute(on, "ThreadCount"));
 
         // 3, shutdown fixed1 in new transaction
         assertFalse(TestingFixedThreadPool.allExecutors.get(0).isShutdown());
@@ -249,10 +247,10 @@ public class SimpleConfigurationTest extends AbstractConfigTest {
 
         // dynamic config should be removed from platform
         try {
-            dynamicMBean.getMBeanInfo();
+            platformMBeanServer.getMBeanInfo(on);
             fail();
         } catch (Exception e) {
-            assertTrue(e.getCause() instanceof InstanceNotFoundException);
+            assertTrue(e instanceof InstanceNotFoundException);
         }
     }
 
index 087648c..9abf645 100644 (file)
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 public class ContextSetterImpl implements ContextSetter, Closeable {
 
     private final LogbackStatusListener statusListener;
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ContextSetterImpl.class);
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ContextSetterImpl.class);
 
     public ContextSetterImpl(final LogbackRuntimeRegistrator rootRuntimeBeanRegistratorWrapper) {
         statusListener = new LogbackStatusListener(rootRuntimeBeanRegistratorWrapper);
@@ -81,11 +81,11 @@ public class ContextSetterImpl implements ContextSetter, Closeable {
         Map<String, Appender<ILoggingEvent>> appendersMap = getAppenders(module, context);
 
         for (LoggerTO logger : module.getLoggerTO()) {
-            LOGGER.trace("Setting configuration for logger {}", logger.getLoggerName());
+            LOG.trace("Setting configuration for logger {}", logger.getLoggerName());
             final ch.qos.logback.classic.Logger logbackLogger = context.getLogger(logger.getLoggerName());
 
             Optional<Set<Appender<ILoggingEvent>>> appendersBefore = getAppendersBefore(loggersBefore, logbackLogger);
-            LOGGER.trace("Logger {}: Appenders registered before: {}", logger.getLoggerName(),
+            LOG.trace("Logger {}: Appenders registered before: {}", logger.getLoggerName(),
                     appendersBefore.isPresent() ? appendersBefore.get() : "NO APPENDERS BEFORE");
 
             logbackLogger.setLevel(Level.toLevel(logger.getLevel()));
@@ -101,7 +101,7 @@ public class ContextSetterImpl implements ContextSetter, Closeable {
             for (String appenderName : logger.getAppenders()) {
                 if (appendersMap.containsKey(appenderName)) {
                     logbackLogger.addAppender(appendersMap.get(appenderName));
-                    LOGGER.trace("Logger {}: Adding new appender: {}", logger.getLoggerName(), appenderName);
+                    LOG.trace("Logger {}: Adding new appender: {}", logger.getLoggerName(), appenderName);
                 } else {
                     throw new IllegalStateException("No appender " + appenderName
                             + " found. This error should have been discovered by validation");
@@ -116,7 +116,7 @@ public class ContextSetterImpl implements ContextSetter, Closeable {
             for (Appender<ILoggingEvent> appenderBefore : appendersBefore.get()) {
                 logbackLogger.detachAppender(appenderBefore);
                 appenderBefore.stop();
-                LOGGER.trace("Logger {}: Removing old appender: {}", logger.getLoggerName(),
+                LOG.trace("Logger {}: Removing old appender: {}", logger.getLoggerName(),
                         appenderBefore.getName());
             }
             loggersBefore.remove(logbackLogger);
index adc69fe..0078120 100644 (file)
  */
 package org.opendaylight.controller.config.yang.logback.config;
 
+import com.google.common.collect.Sets;
 import java.util.Set;
-
 import org.opendaylight.controller.config.api.JmxAttribute;
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
 
-import com.google.common.collect.Sets;
-
 /**
 *
 */
index e8d161a..8111136 100644 (file)
  */
 package org.opendaylight.controller.config.yang.logback.config;
 
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.core.status.StatusBase;
+import ch.qos.logback.core.status.StatusListener;
+import ch.qos.logback.core.status.StatusManager;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-
 import org.slf4j.LoggerFactory;
 
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.core.status.StatusBase;
-import ch.qos.logback.core.status.StatusListener;
-import ch.qos.logback.core.status.StatusManager;
-
 public class LogbackStatusListener implements StatusListener, LogbackRuntimeMXBean, Closeable {
 
     private final List<StatusTO> receivedStatuses;
index 4ba4a02..82bc586 100644 (file)
@@ -13,10 +13,15 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -25,14 +30,6 @@ import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 import org.slf4j.LoggerFactory;
 
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.Appender;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-
 public class ContextSetterImplTest {
 
     @Mock
@@ -154,4 +151,5 @@ public class ContextSetterImplTest {
         a.setEncoderPattern("%-4relative [%thread] %-5level %logger{35} - %msg%n");
         return a;
     }
+
 }
index 9e07cc6..d190f04 100644 (file)
@@ -148,7 +148,7 @@ public class LogbackModuleTest extends AbstractConfigTest {
         try {
             createBeans(
 
-            true, "target/rollingApp", "", "30MB", 1, 5, "target/%i.log", "rolling", "consoleName", "ALL", "logger1",
+                true, "target/rollingApp", "", "30MB", 1, 5, "target/%i.log", "rolling", "consoleName", "ALL", "logger1",
                     "DEBUG", "FixedWindowRollingPolicy", 0, "FileAppender").commit();
             fail();
         } catch (ValidationException e) {
index 37bfb6d..999d834 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.config.yang.logback.config;
 
+import static org.junit.Assert.assertEquals;
+
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
 import ch.qos.logback.classic.joran.JoranConfigurator;
@@ -15,6 +17,17 @@ import ch.qos.logback.core.joran.spi.JoranException;
 import ch.qos.logback.core.rolling.FixedWindowRollingPolicy;
 import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
 import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.commons.io.FileUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,20 +40,6 @@ import org.opendaylight.controller.config.util.ConfigTransactionClient;
 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
 import org.slf4j.LoggerFactory;
 
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
 public class LogbackModuleWithInitialConfigurationTest extends AbstractConfigTest {
 
     private LogbackModuleFactory factory;
index 3c21e57..9fcd389 100644 (file)
@@ -7,10 +7,21 @@
  */
 package org.opendaylight.controller.config.yang.logback.config;
 
+import static org.junit.Assert.assertEquals;
+
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
 import ch.qos.logback.core.joran.spi.JoranException;
 import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.commons.io.FileUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -21,18 +32,6 @@ import org.opendaylight.controller.config.manager.impl.factoriesresolver.Hardcod
 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
 import org.slf4j.LoggerFactory;
 
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
 public class LogbackWithXmlConfigModuleTest extends AbstractConfigTest {
 
     private LogbackModuleFactory factory;
index 1585bbf..1ca57c6 100644 (file)
  */
 package org.opendaylight.controller.config.yang.netty.eventexecutor;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.osgi.framework.BundleContext;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class GlobalEventExecutorModuleFactory extends org.opendaylight.controller.config.yang.netty.eventexecutor.AbstractGlobalEventExecutorModuleFactory {
     public static final String SINGLETON_NAME = "singleton";
 
index b3ec67b..2a804eb 100644 (file)
@@ -7,11 +7,11 @@
  */
 package org.opendaylight.controller.config.yang.netty.eventexecutor;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.osgi.framework.BundleContext;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class ImmediateEventExecutorModuleFactory extends org.opendaylight.controller.config.yang.netty.eventexecutor.AbstractImmediateEventExecutorModuleFactory {
     public static final String SINGLETON_NAME = "singleton";
 
index e89d82a..8fe3d7c 100644 (file)
@@ -8,6 +8,11 @@
 
 package org.opendaylight.controller.config.yang.netty.eventexecutor;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.ObjectName;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.ConflictingVersionException;
@@ -17,12 +22,6 @@ import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
 import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
 
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.ObjectName;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class GlobalEventExecutorModuleTest extends AbstractConfigTest {
 
     private GlobalEventExecutorModuleFactory factory;
@@ -79,7 +78,7 @@ public class GlobalEventExecutorModuleTest extends AbstractConfigTest {
     private ObjectName createInstance(ConfigTransactionJMXClient transaction, String instanceName)
             throws InstanceAlreadyExistsException {
         ObjectName nameCreated = transaction.createModule(factory.getImplementationName(), instanceName);
-        transaction.newMBeanProxy(nameCreated, GlobalEventExecutorModuleMXBean.class);
+        transaction.newMXBeanProxy(nameCreated, GlobalEventExecutorModuleMXBean.class);
         return nameCreated;
     }
 
index 54c8760..d280bca 100644 (file)
@@ -64,7 +64,7 @@ public class ImmediateEventExecutorModuleTest extends AbstractConfigTest {
     private ObjectName createInstance(ConfigTransactionJMXClient transaction, String instanceName)
             throws InstanceAlreadyExistsException {
         ObjectName nameCreated = transaction.createModule(factory.getImplementationName(), instanceName);
-        transaction.newMBeanProxy(nameCreated, ImmediateEventExecutorModuleMXBean.class);
+        transaction.newMXBeanProxy(nameCreated, ImmediateEventExecutorModuleMXBean.class);
         return nameCreated;
     }
 
index 76134b0..ceefcda 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.config.yang.netty.threadgroup;
 
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.ConflictingVersionException;
@@ -16,10 +19,6 @@ import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
 import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
 
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-
 public class NettyThreadgroupModuleTest extends AbstractConfigTest {
 
     private NettyThreadgroupModuleFactory factory;
index ad2607a..fc447aa 100644 (file)
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <sourceDirectory>${project.basedir}</sourceDirectory>
           <includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat,**\/*.yang</includes>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/</excludes>
+          <!-- excluding logback-config, has several checkstyle warnings
+               regarding Logger/LoggerFactory, which couldn't be removed due necessity/intention
+               to use the particular implementation/library of Logger/LoggerFactory -->
+          <excludes>**\/logback-config\/,**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/</excludes>
         </configuration>
         <dependencies>
           <dependency>
index e44b569..f4ca221 100644 (file)
       <type>xml</type>
       <scope>runtime</scope>
     </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>features-extras</artifactId>
+          <version>${project.version}</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+          <scope>runtime</scope>
+      </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>features-flow</artifactId>
index f2ba547..6c87899 100644 (file)
@@ -53,6 +53,7 @@
 
     <module>inventory-manager</module>
     <module>statistics-manager</module>
+    <module>statistics-manager-config</module>
     <module>topology-manager</module>
     <module>forwardingrules-manager</module>
     <module>topology-lldp-discovery</module>
             </lifecycleMappingMetadata>
           </configuration>
         </plugin>
-        <plugin>
-          <groupId>org.eclipse.xtend</groupId>
-          <artifactId>xtend-maven-plugin</artifactId>
-          <version>${xtend.version}</version>
-        </plugin>
         <plugin>
           <groupId>org.jacoco</groupId>
           <artifactId>jacoco-maven-plugin</artifactId>
index 6dfa4af..6c65021 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
  */
 public class ExampleActor extends RaftActor {
 
-    private final Map<String, String> state = new HashMap<>();
+    private final Map<String, String> state = new HashMap();
     private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
+    private Optional<ActorRef> roleChangeNotifier;
 
 
-    public ExampleActor(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams) {
+    public ExampleActor(String id, Map<String, String> peerAddresses,
+        Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
         this.dataPersistenceProvider = new PersistentDataProvider();
+        roleChangeNotifier = createRoleChangeNotifier(id);
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
         final Optional<ConfigParams> configParams){
         return Props.create(new Creator<ExampleActor>(){
-            private static final long serialVersionUID = 1L;
 
             @Override public ExampleActor create() throws Exception {
                 return new ExampleActor(id, peerAddresses, configParams);
@@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor {
         });
     }
 
-    @Override public void onReceiveCommand(final Object message) throws Exception{
+    @Override public void onReceiveCommand(Object message) throws Exception{
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor {
                 String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
-                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
-                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet());
                 }
 
 
@@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+    }
+
+    public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+        ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+        return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     @Override protected void applyState(final ActorRef clientActor, final String identifier,
         final Object data) {
         if(data instanceof KeyValue){
@@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor {
         getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(final ByteString snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
         try {
-            state.putAll((Map<String, String>) toObject(snapshot));
+            state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
            LOG.error(e, "Exception in applying snapshot");
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
-    private ByteString fromObject(final Object snapshot) throws Exception {
+    private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
@@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
@@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override public void onReceiveRecover(final Object message)throws Exception {
+    @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
 
@@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(final int maxBatchSize) {
+    protected void startLogRecoveryBatch(int maxBatchSize) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
+    protected void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
@@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(ByteString snapshot) {
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java
new file mode 100644 (file)
index 0000000..c0ee095
--- /dev/null
@@ -0,0 +1,146 @@
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.japi.Creator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.example.messages.RegisterListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * <p/>
+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ * <p/>
+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * <p/>
+ * If all the notifiers have been regsitered with, then it cancels the scheduler.
+ * It starts the scheduler again when it receives a new registration
+ *
+ */
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+    // the akka url should be set to the notifiers actor-system and domain.
+    private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+
+    private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+    private Cancellable registrationSchedule = null;
+    private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+    private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
+    private final String memberName;
+    private static final String[] shardsToMonitor = new String[] {"example"};
+
+    public ExampleRoleChangeListener(String memberName) {
+        super();
+        scheduleRegistrationListener(schedulerDuration);
+        this.memberName = memberName;
+        populateRegistry(memberName);
+    }
+
+    public static Props getProps(final String memberName) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new ExampleRoleChangeListener(memberName);
+            }
+        });
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterListener) {
+            // called by the scheduler at intervals to register any unregistered notifiers
+            sendRegistrationRequests();
+
+        } else if (message instanceof RegisterRoleChangeListenerReply) {
+            // called by the Notifier
+            handleRegisterRoleChangeListenerReply(getSender().path().toString());
+
+        } else if (message instanceof RoleChangeNotification) {
+            // called by the Notifier
+            RoleChangeNotification notification = (RoleChangeNotification) message;
+
+            LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
+                notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
+
+            // the apps dependent on such notifications can be called here
+            //TODO: add implementation here
+
+        }
+    }
+
+    private void scheduleRegistrationListener(FiniteDuration interval) {
+        LOG.debug("--->scheduleRegistrationListener called.");
+        registrationSchedule = getContext().system().scheduler().schedule(
+            interval, interval, getSelf(), new RegisterListener(),
+            getContext().system().dispatcher(), getSelf());
+
+    }
+
+    private void populateRegistry(String memberName) {
+
+        for (String shard: shardsToMonitor) {
+            String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+                .append("/").append(memberName).append("-notifier").toString();
+
+            if (!notifierRegistrationStatus.containsKey(notifier)) {
+                notifierRegistrationStatus.put(notifier, false);
+            }
+        }
+
+        if (!registrationSchedule.isCancelled()) {
+            scheduleRegistrationListener(schedulerDuration);
+        }
+    }
+
+    private void sendRegistrationRequests() {
+        for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
+            if (!entry.getValue()) {
+                try {
+                    LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
+                    ActorRef notifier = Await.result(
+                        getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+
+                    notifier.tell(new RegisterRoleChangeListener(), getSelf());
+
+                } catch (Exception e) {
+                    LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
+                }
+            }
+        }
+    }
+
+    private void handleRegisterRoleChangeListenerReply(String senderId) {
+        if (notifierRegistrationStatus.containsKey(senderId)) {
+            notifierRegistrationStatus.put(senderId, true);
+
+            //cancel the schedule when listener is registered with all notifiers
+            if (!registrationSchedule.isCancelled()) {
+                boolean cancelScheduler = true;
+                for (Boolean value : notifierRegistrationStatus.values()) {
+                    cancelScheduler = cancelScheduler & value;
+                }
+                if (cancelScheduler) {
+                    registrationSchedule.cancel();
+                }
+            }
+        } else {
+            LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+                senderId);
+        }
+    }
+
+
+    @Override
+    public void close() throws Exception {
+        registrationSchedule.cancel();
+    }
+}
index de61697..cd2e4a5 100644 (file)
@@ -3,15 +3,17 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 
 /**
  * This is a test driver for testing akka-raft implementation
@@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class TestDriver {
 
-    private static final ActorSystem actorSystem = ActorSystem.create();
+
     private static Map<String, String> allPeers = new HashMap<>();
     private static Map<String, ActorRef> clientActorRefs  = new HashMap<String, ActorRef>();
     private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
@@ -29,6 +31,9 @@ public class TestDriver {
     private int nameCounter = 0;
     private static ConfigParams configParams = new ExampleConfigParamsImpl();
 
+    private static ActorSystem actorSystem;
+    private static ActorSystem listenerActorSystem;
+
     /**
      * Create nodes, add clients and start logging.
      * Commands
@@ -53,6 +58,13 @@ public class TestDriver {
      * @throws Exception
      */
     public static void main(String[] args) throws Exception {
+
+        actorSystem = ActorSystem.create("raft-test", ConfigFactory
+            .load().getConfig("raft-test"));
+
+        listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+            .load().getConfig("raft-test-listener"));
+
         TestDriver td = new TestDriver();
 
         System.out.println("Enter command (type bye to exit):");
@@ -113,6 +125,16 @@ public class TestDriver {
         }
     }
 
+    // create the listener using a separate actor system for each example actor
+    private void createClusterRoleChangeListener(List<String> memberIds) {
+        System.out.println("memberIds="+memberIds);
+        for (String memberId : memberIds) {
+            ActorRef listenerActor = listenerActorSystem.actorOf(
+                ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+            System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+        }
+    }
+
     public static ActorRef createExampleActor(String name) {
         return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
             Optional.of(configParams)), name);
@@ -121,7 +143,7 @@ public class TestDriver {
     public void createNodes(int num) {
         for (int i=0; i < num; i++)  {
             nameCounter = nameCounter + 1;
-            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+            allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
         }
 
         for (String s : allPeers.keySet())  {
@@ -130,6 +152,8 @@ public class TestDriver {
             System.out.println("Created node:"+s);
 
         }
+
+        createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
     }
 
     // add num clients to all nodes in the system
index e0873cc..d2862c2 100644 (file)
@@ -9,13 +9,12 @@
 package org.opendaylight.controller.cluster.example.messages;
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 public class KeyValue extends Payload implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -71,4 +70,9 @@ public class KeyValue extends Payload implements Serializable {
         return this;
     }
 
+    @Override
+    public int size() {
+        return this.value.length() + this.key.length();
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java
new file mode 100644 (file)
index 0000000..4507f43
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers
+ *
+ * This message is sent by the scheduler
+ */
+public class RegisterListener {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java
new file mode 100644 (file)
index 0000000..8adc0da
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.util.List;
+
+/**
+ * Created by kramesha on 11/18/14.
+ */
+public class SetNotifiers {
+    private List<String> notifierList;
+
+    public SetNotifiers(List<String> notifierList) {
+        this.notifierList = notifierList;
+    }
+
+    public List<String> getNotifierList() {
+        return notifierList;
+    }
+}
index a2c9d66..653520c 100644 (file)
@@ -26,6 +26,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
+    protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
@@ -198,6 +199,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
+        dataSize = 0;
     }
 
     @Override
index 433c3f7..4245cf1 100644 (file)
@@ -28,6 +28,14 @@ public interface ConfigParams {
      */
     long getSnapshotBatchCount();
 
+    /**
+     * The percentage of total memory in the in-memory Raft log before a snapshot
+     * is to be taken
+     *
+     * @return int
+     */
+    int getSnapshotDataThresholdPercentage();
+
     /**
      * The interval at which a heart beat message will be sent to the remote
      * RaftActor
index a209223..3a6bdbf 100644 (file)
@@ -7,9 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Default implementation of the ConfigParams
@@ -47,6 +46,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration isolatedLeaderCheckInterval =
         new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
 
+    // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+    // in-memory journal can use before it needs to snapshot
+    private int snapshotDataThresholdPercentage = 12;
+
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
     }
@@ -55,6 +58,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
+    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+        this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+    }
+
     public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
@@ -68,6 +75,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotBatchCount;
     }
 
+    @Override
+    public int getSnapshotDataThresholdPercentage() {
+        return snapshotDataThresholdPercentage;
+    }
+
+
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
index 042b9fb..3b84692 100644 (file)
@@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import java.io.Serializable;
-import java.util.Map;
-
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
@@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
 
                 onRecoveryComplete();
+
+                RaftActorBehavior oldBehavior = currentBehavior;
                 currentBehavior = new Follower(context);
-                onStateChanged();
+                handleBehaviorChange(oldBehavior, currentBehavior);
             }
         }
     }
@@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
+        RaftActorBehavior oldBehavior = currentBehavior;
         currentBehavior = new Follower(context);
-        onStateChanged();
+        handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            if(oldBehavior != currentBehavior){
-                onStateChanged();
-            }
-
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+            handleBehaviorChange(oldBehavior, currentBehavior);
         }
     }
 
-    public java.util.Set<String> getPeers() {
-
-        return context.getPeerAddresses().keySet();
-    }
+    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+        if (oldBehavior != currentBehavior){
+            onStateChanged();
+        }
+        if (oldBehavior != null) {
+            // it can happen that the state has not changed but the leader has changed.
+            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
 
-    protected String getReplicatedLogState() {
-        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
-            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
-            + ", im-mem journal size=" + context.getReplicatedLog().size();
+            if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+                // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+                getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+                    currentBehavior.state().name()), getSelf());
+            }
+        }
     }
 
-
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
@@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected abstract DataPersistenceProvider persistence();
 
+    /**
+     * Notifier Actor for this RaftActor to notify when a role change happens
+     * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+     */
+    protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
@@ -667,6 +676,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
                     //FIXME : Doing nothing for now
+                    dataSize = 0;
+                    for(ReplicatedLogEntry entry : journal){
+                        dataSize += entry.size();
+                    }
                 }
             });
         }
@@ -676,6 +689,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(null, null, replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return dataSize;
+        }
+
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
@@ -696,9 +714,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
+                        dataSize += replicatedLogEntry.size();
+
+                        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
-                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+                                ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+                                        dataSize > dataThreshold)) {
 
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
@@ -843,4 +867,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected RaftActorBehavior getCurrentBehavior() {
         return currentBehavior;
     }
+
 }
index 7ee8532..80b7ad9 100644 (file)
@@ -171,4 +171,9 @@ public interface ReplicatedLog {
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
     public void snapshotRollback();
+
+    /**
+     * Size of the data in the log (in bytes)
+     */
+    public int dataSize();
 }
index f501c4d..1979609 100644 (file)
@@ -34,4 +34,13 @@ public interface ReplicatedLogEntry {
      * @return
      */
     long getIndex();
+
+    /**
+     * The size of the entry in bytes.
+     *
+     * An approximate number may be good enough.
+     *
+     * @return
+     */
+    int size();
 }
index ceb5633..986c7f3 100644 (file)
@@ -8,9 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
     Serializable {
@@ -39,6 +38,11 @@ public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
         return index;
     }
 
+    @Override
+    public int size() {
+        return getData().size();
+    }
+
     @Override public String toString() {
         return "Entry{" +
             "index=" + index +
index d85ac8e..e5c5dc7 100644 (file)
@@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+    // The index of the first chunk that is sent when installing a snapshot
+    public static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    public static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
     protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
@@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         "sending snapshot chunk failed, Will retry, Chunk:{}",
                     reply.getChunkIndex()
                 );
+
                 followerToSnapshot.markSendStatus(false);
             }
 
@@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
+
+            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+                // Since the Follower did not find this index to be valid we should reset the follower snapshot
+                // so that Installing the snapshot can resume from the beginning
+                followerToSnapshot.reset();
+            }
         }
     }
 
@@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotTerm(),
                         getNextSnapshotChunk(followerId,snapshot.get()),
                         mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
@@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         private boolean replyStatus = false;
         private int chunkIndex;
         private int totalChunks;
+        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
@@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
                     size, totalChunks);
             }
+            replyReceivedForOffset = -1;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
         }
 
         public ByteString getSnapshotBytes() {
@@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if the chunk sent was successful
                 replyReceivedForOffset = offset;
                 replyStatus = true;
+                lastChunkHashCode = nextChunkHashCode;
             } else {
                 // if the chunk sent was failure
                 replyReceivedForOffset = offset;
@@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("length={}, offset={},size={}",
                     snapshotLength, start, size);
             }
-            return getSnapshotBytes().substring(start, start + size);
+            ByteString substring = getSnapshotBytes().substring(start, start + size);
+            nextChunkHashCode = substring.hashCode();
+            return substring;
+        }
+
+        /**
+         * reset should be called when the Follower needs to be sent the snapshot from the beginning
+         */
+        public void reset(){
+            offset = 0;
+            replyStatus = false;
+            replyReceivedForOffset = offset;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        }
 
+        public int getLastChunkHashCode() {
+            return lastChunkHashCode;
         }
     }
 
index 7ada8b3..b1c73f6 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.ArrayList;
-
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -36,7 +36,8 @@ import java.util.ArrayList;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
         super(context);
@@ -280,48 +281,54 @@ public class Follower extends AbstractRaftActorBehavior {
             );
         }
 
-        try {
-            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
-                // this is the last chunk, create a snapshot object and apply
-
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
-                            snapshotChunksCollected.size());
-                }
+        if(snapshotTracker == null){
+            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+        }
 
-                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
-                    new ArrayList<ReplicatedLogEntry>(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm());
+        try {
+            if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+                    installSnapshot.getLastChunkHashCode())){
+                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                        new ArrayList<ReplicatedLogEntry>(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm());
 
                 actor().tell(new ApplySnapshot(snapshot), actor());
 
-            } else {
-                // we have more to go
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                snapshotTracker = null;
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
-                        installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
-                }
             }
 
             sender.tell(new InstallSnapshotReply(
-                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                true), actor());
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                    true), actor());
+
+        } catch (SnapshotTracker.InvalidChunkException e) {
+
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                    -1, false), actor());
+            snapshotTracker = null;
+
+        } catch (Exception e){
 
-        } catch (Exception e) {
             LOG.error(e, "Exception in InstallSnapshot of follower:");
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-                installSnapshot.getChunkIndex(), false), actor());
+                    installSnapshot.getChunkIndex(), false), actor());
+
         }
     }
 
     @Override public void close() throws Exception {
         stopElection();
     }
+
+    @VisibleForTesting
+    ByteString getSnapshotChunksCollected(){
+        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    }
+
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
new file mode 100644 (file)
index 0000000..26fbde0
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+    private final LoggingAdapter LOG;
+    private final int totalChunks;
+    private ByteString collectedChunks = ByteString.EMPTY;
+    private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+    private boolean sealed = false;
+    private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+    SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+        this.LOG = LOG;
+        this.totalChunks = totalChunks;
+    }
+
+    /**
+     * Adds a chunk to the tracker
+     *
+     * @param chunkIndex
+     * @param chunk
+     * @return true when the lastChunk is received
+     * @throws InvalidChunkException
+     */
+    boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+        if(sealed){
+            throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+        }
+
+        if(lastChunkIndex + 1 != chunkIndex){
+            throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+        }
+
+        if(lastChunkHashCode.isPresent()){
+            if(lastChunkHashCode.get() != this.lastChunkHashCode){
+                throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+                        "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+            }
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Chunk={},collectedChunks.size:{}",
+                    chunkIndex, collectedChunks.size());
+        }
+
+        sealed = (chunkIndex == totalChunks);
+        lastChunkIndex = chunkIndex;
+        collectedChunks = collectedChunks.concat(chunk);
+        this.lastChunkHashCode = chunk.hashCode();
+        return sealed;
+    }
+
+    byte[] getSnapshot(){
+        if(!sealed) {
+            throw new IllegalStateException("lastChunk not received yet");
+        }
+
+        return collectedChunks.toByteArray();
+    }
+
+    ByteString getCollectedChunks(){
+        return collectedChunks;
+    }
+
+    public static class InvalidChunkException extends Exception {
+        InvalidChunkException(String message){
+            super(message);
+        }
+    }
+
+}
index 3c4e811..6337f8f 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
@@ -22,9 +23,10 @@ public class InstallSnapshot extends AbstractRaftRPC {
     private final ByteString data;
     private final int chunkIndex;
     private final int totalChunks;
+    private final Optional<Integer> lastChunkHashCode;
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -32,8 +34,15 @@ public class InstallSnapshot extends AbstractRaftRPC {
         this.data = data;
         this.chunkIndex = chunkIndex;
         this.totalChunks = totalChunks;
+        this.lastChunkHashCode = lastChunkHashCode;
     }
 
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+                           long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+    }
+
+
     public String getLeaderId() {
         return leaderId;
     }
@@ -58,25 +67,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return totalChunks;
     }
 
-    public <T extends Object> Object toSerializable(){
-        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
-            .setLeaderId(this.getLeaderId())
-            .setChunkIndex(this.getChunkIndex())
-            .setData(this.getData())
-            .setLastIncludedIndex(this.getLastIncludedIndex())
-            .setLastIncludedTerm(this.getLastIncludedTerm())
-            .setTotalChunks(this.getTotalChunks()).build();
+    public Optional<Integer> getLastChunkHashCode() {
+        return lastChunkHashCode;
+    }
 
+    public <T extends Object> Object toSerializable(){
+        InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+                .setLeaderId(this.getLeaderId())
+                .setChunkIndex(this.getChunkIndex())
+                .setData(this.getData())
+                .setLastIncludedIndex(this.getLastIncludedIndex())
+                .setLastIncludedTerm(this.getLastIncludedTerm())
+                .setTotalChunks(this.getTotalChunks());
+
+        if(lastChunkHashCode.isPresent()){
+            builder.setLastChunkHashCode(lastChunkHashCode.get());
+        }
+        return builder.build();
     }
 
     public static InstallSnapshot fromSerializable (Object o) {
         InstallSnapshotMessages.InstallSnapshot from =
             (InstallSnapshotMessages.InstallSnapshot) o;
 
+        Optional<Integer> lastChunkHashCode = Optional.absent();
+        if(from.hasLastChunkHashCode()){
+            lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+        }
+
         InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
             from.getLeaderId(), from.getLastIncludedIndex(),
             from.getLastIncludedTerm(), from.getData(),
-            from.getChunkIndex(), from.getTotalChunks());
+            from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
 
         return installSnapshot;
     }
index 9e42a13..b2132b8 100644 (file)
@@ -13,8 +13,67 @@ akka {
 
         serialization-bindings {
             "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+            "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
             "com.google.protobuf.Message" = proto
             "com.google.protobuf.GeneratedMessage" = proto
         }
     }
 }
+
+raft-test {
+    akka {
+
+        loglevel = "DEBUG"
+
+        actor {
+            # enable to test serialization only.
+            # serialize-messages = on
+
+            provider = "akka.remote.RemoteActorRefProvider"
+
+            serializers {
+              java  = "akka.serialization.JavaSerializer"
+              proto = "akka.remote.serialization.ProtobufSerializer"
+            }
+
+            serialization-bindings {
+                "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+                "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
+                "com.google.protobuf.Message" = proto
+                "com.google.protobuf.GeneratedMessage" = proto
+            }
+        }
+
+        remote {
+                        log-remote-lifecycle-events = off
+                        netty.tcp {
+                            hostname = "127.0.0.1"
+                            port = 2550
+                        }
+                    }
+    }
+}
+
+raft-test-listener {
+
+  akka {
+    loglevel = "DEBUG"
+
+    actor {
+        provider = "akka.remote.RemoteActorRefProvider"
+    }
+
+    remote {
+        log-remote-lifecycle-events = off
+        netty.tcp {
+            hostname = "127.0.0.1"
+            port = 2554
+        }
+    }
+
+    member-id = "member-1"
+  }
+}
+
+
+
index 398a2e9..d95c9d5 100644 (file)
@@ -164,6 +164,11 @@ public class AbstractReplicatedLogImplTest {
             this.snapshotTerm = snapshotTerm;
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
index 562ca21..2424d4d 100644 (file)
@@ -16,13 +16,12 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
 
 public class MockRaftActorContext implements RaftActorContext {
 
@@ -192,6 +191,11 @@ public class MockRaftActorContext implements RaftActorContext {
             append(replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
@@ -222,6 +226,11 @@ public class MockRaftActorContext implements RaftActorContext {
             return this;
         }
 
+        @Override
+        public int size() {
+            return value.length();
+        }
+
         @Override public String getClientPayloadClassName() {
             return MockPayload.class.getName();
         }
@@ -256,6 +265,11 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public long getIndex() {
             return index;
         }
+
+        @Override
+        public int size() {
+            return getData().size();
+        }
     }
 
     public static class MockReplicatedLogBuilder {
index 9eb2fb7..c833a86 100644 (file)
@@ -21,11 +21,25 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -36,27 +50,13 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -83,6 +83,9 @@ public class RaftActorTest extends AbstractActorTest {
 
         private final DataPersistenceProvider dataPersistenceProvider;
         private final RaftActor delegate;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+        private ActorRef roleChangeNotifier;
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -90,25 +93,27 @@ public class RaftActorTest extends AbstractActorTest {
             private final String id;
             private final Optional<ConfigParams> config;
             private final DataPersistenceProvider dataPersistenceProvider;
+            private final ActorRef roleChangeNotifier;
 
             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                    Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+                ActorRef roleChangeNotifier) {
                 this.peerAddresses = peerAddresses;
                 this.id = id;
                 this.config = config;
                 this.dataPersistenceProvider = dataPersistenceProvider;
+                this.roleChangeNotifier = roleChangeNotifier;
             }
 
             @Override
             public MockRaftActor create() throws Exception {
-                return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
+                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                    dataPersistenceProvider);
+                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+                return mockRaftActor;
             }
         }
 
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        private final List<Object> state;
-
         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
@@ -134,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest {
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                 Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+        }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
             delegate.applyState(clientActor, identifier, data);
             LOG.info("applyState called");
         }
 
-
-
-
         @Override
         protected void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -201,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest {
             return this.dataPersistenceProvider;
         }
 
+        @Override
+        protected Optional<ActorRef> getRoleChangeNotifier() {
+            return Optional.fromNullable(roleChangeNotifier);
+        }
+
         @Override public String persistenceId() {
             return this.getId();
         }
@@ -862,6 +873,40 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
+    @Test
+    public void testRaftRoleChangeNotifier() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            String id = "testRaftRoleChangeNotifier";
+
+            TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
+
+            MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+            mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
+
+            // sleeping for a minimum of 2 seconds, if it spans more its fine.
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+            assertNotNull(matches);
+            assertEquals(2, matches.size());
+
+            // check if the notifier got a role change from Follower to Candidate
+            RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+            // check if the notifier got a role change from Candidate to Leader
+            raftRoleChanged = (RoleChanged) matches.get(1);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+        }};
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 83b9ad3..0ee9693 100644 (file)
@@ -1,9 +1,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -20,19 +31,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef followerActor = getSystem().actorOf(Props.create(
@@ -452,18 +450,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             int offset = 0;
             int snapshotLength = bsSnapshot.size();
             int i = 1;
+            int chunkIndex = 1;
 
             do {
                 chunkData = getNextChunk(bsSnapshot, offset);
                 final InstallSnapshot installSnapshot =
                     new InstallSnapshot(1, "leader-1", i, 1,
-                        chunkData, i, 3);
+                        chunkData, chunkIndex, 3);
                 follower.handleMessage(leaderActor, installSnapshot);
                 offset = offset + 50;
                 i++;
+                chunkIndex++;
             } while ((offset+50) < snapshotLength);
 
-            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
             follower.handleMessage(leaderActor, installSnapshot3);
 
             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
@@ -490,6 +490,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 }
             }.get();
 
+            // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+            assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
             String applySnapshotMatch = "";
             for (String reply: matches) {
                 if (reply.startsWith("applySnapshot")) {
@@ -517,6 +520,52 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor = getSystem().actorOf(Props.create(
+                        MessageCollectorActor.class));
+
+                MockRaftActorContext context = (MockRaftActorContext)
+                        createActorContext(getRef());
+
+                Follower follower = (Follower) createBehavior(context);
+
+                HashMap<String, String> followerSnapshot = new HashMap<>();
+                followerSnapshot.put("1", "A");
+                followerSnapshot.put("2", "B");
+                followerSnapshot.put("3", "C");
+
+                ByteString bsSnapshot = toByteString(followerSnapshot);
+
+                final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+
+                Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+                assertNotNull(messages);
+                assertTrue(messages instanceof List);
+                List<Object> listMessages = (List<Object>) messages;
+
+                int installSnapshotReplyReceivedCount = 0;
+                for (Object message: listMessages) {
+                    if (message instanceof InstallSnapshotReply) {
+                        ++installSnapshotReplyReceivedCount;
+                    }
+                }
+
+                assertEquals(1, installSnapshotReplyReceivedCount);
+                InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+                assertEquals(false, reply.isSuccess());
+                assertEquals(-1, reply.getChunkIndex());
+                assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+            }};
+    }
+
     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
         return MessageCollectorActor.getAllMessages(actor);
     }
index 0fc0b4c..895fe35 100644 (file)
@@ -1,11 +1,16 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -41,9 +46,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -125,7 +127,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                            followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
@@ -570,6 +572,156 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
+        }};
+    }
+
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
+                    }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
+
+                MockLeader leader = new MockLeader(actorContext);
+
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
+
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
     @Test
     public void testFollowerToSnapshotLogic() {
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
new file mode 100644 (file)
index 0000000..1b3a8f5
--- /dev/null
@@ -0,0 +1,181 @@
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+    Map<String, String> data;
+    ByteString byteString;
+    ByteString chunk1;
+    ByteString chunk2;
+    ByteString chunk3;
+
+    @Before
+    public void setup(){
+        data = new HashMap<>();
+        data.put("key1", "value1");
+        data.put("key2", "value2");
+        data.put("key3", "value3");
+
+        byteString = toByteString(data);
+        chunk1 = getNextChunk(byteString, 0, 10);
+        chunk2 = getNextChunk(byteString, 10, 10);
+        chunk3 = getNextChunk(byteString, 20, byteString.size());
+    }
+
+    @Test
+    public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        try {
+            tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+            e.getMessage().startsWith("Invalid chunk");
+        }
+
+        // The first chunk's index must at least be FIRST_CHUNK_INDEX
+        SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        try {
+            tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // Out of sequence chunk indexes won't work
+        SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+        try {
+            tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // No exceptions will be thrown when invalid chunk is added with the right sequence
+        // If the lastChunkHashCode is missing
+        SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+        // Look I can add the same chunk again
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+        // An exception will be thrown when an invalid chunk is addedd with the right sequence
+        // when the lastChunkHashCode is present
+        SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+        try {
+            // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+            tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+            Assert.fail();
+        }catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+    }
+
+    @Test
+    public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+        // Trying to get a snapshot before all chunks have been received will throw an exception
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        try {
+            tracker1.getSnapshot();
+            Assert.fail();
+        } catch(IllegalStateException e){
+
+        }
+
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        byte[] snapshot = tracker2.getSnapshot();
+
+        assertEquals(byteString, ByteString.copyFrom(snapshot));
+    }
+
+    @Test
+    public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        ByteString chunks = chunk1.concat(chunk2);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        assertEquals(chunks, tracker1.getCollectedChunks());
+    }
+
+    public ByteString getNextChunk (ByteString bs, int offset, int size){
+        int snapshotLength = bs.size();
+        int start = offset;
+        if (size > snapshotLength) {
+            size = snapshotLength;
+        } else {
+            if ((start + size) > snapshotLength) {
+                size = snapshotLength - start;
+            }
+        }
+        return bs.substring(start, start + size);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+
+}
\ No newline at end of file
index 9fbdd45..3469a95 100644 (file)
@@ -13,15 +13,14 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 
 public class MessageCollectorActor extends UntypedActor {
     private List<Object> messages = new ArrayList<>();
index ade3e1b..9346e22 100644 (file)
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.eclipse.xtend</groupId>
-      <artifactId>org.eclipse.xtend.lib</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.javassist</groupId>
       <artifactId>javassist</artifactId>
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.eclipse.xtend</groupId>
-        <artifactId>xtend-maven-plugin</artifactId>
-      </plugin>
       <plugin>
         <groupId>org.jacoco</groupId>
         <artifactId>jacoco-maven-plugin</artifactId>
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/DefaultRuntimeCodeGenerator.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/DefaultRuntimeCodeGenerator.java
new file mode 100644 (file)
index 0000000..dfa164b
--- /dev/null
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.codegen.impl;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import java.lang.reflect.Method;
+import java.util.Map;
+import javassist.CannotCompileException;
+import javassist.ClassPool;
+import javassist.CtClass;
+import javassist.CtMethod;
+import javassist.NotFoundException;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeSpecification;
+import org.opendaylight.yangtools.sal.binding.generator.util.ClassGenerator;
+import org.opendaylight.yangtools.sal.binding.generator.util.MethodGenerator;
+import org.opendaylight.yangtools.util.ClassLoaderUtils;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.RpcImplementation;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+
+final class DefaultRuntimeCodeGenerator extends AbstractRuntimeCodeGenerator {
+
+    DefaultRuntimeCodeGenerator(final ClassPool pool) {
+        super(pool);
+    }
+
+    @Override
+    protected <T extends RpcService> Supplier<T> directProxySupplier(final Class<T> serviceType) {
+        return new Supplier<T>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public T get() {
+                final String proxyName = RuntimeCodeSpecification.getDirectProxyName(serviceType);
+
+                final Class<?> potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(proxyName);
+                if (potentialClass != null) {
+                    try {
+                        return (T)potentialClass.newInstance();
+                    } catch (InstantiationException | IllegalAccessException e) {
+                        throw new IllegalStateException("Failed to instantiate class " + potentialClass.getName(), e);
+                    }
+                }
+
+                final CtClass supertype = utils.asCtClass(serviceType);
+                final String directProxyName = RuntimeCodeSpecification.getDirectProxyName(serviceType);
+
+                final CtClass createdCls;
+                try {
+                    createdCls = utils.createClass(directProxyName, supertype, new ClassGenerator() {
+                        @Override
+                        public void process(final CtClass cls) throws CannotCompileException {
+                            utils.field(cls, RuntimeCodeSpecification.DELEGATE_FIELD, serviceType);
+                            utils.implementsType(cls, utils.asCtClass(RpcImplementation.class));
+                            utils.implementMethodsFrom(cls, supertype, new MethodGenerator() {
+                                @Override
+                                public void process(final CtMethod method) throws CannotCompileException {
+                                    final StringBuilder sb = new StringBuilder("\n");
+                                    sb.append("{\n");
+                                    sb.append("    if (").append(RuntimeCodeSpecification.DELEGATE_FIELD).append(" == null) {\n");
+                                    sb.append("        throw new java.lang.IllegalStateException(\"No default provider is available\");\n");
+                                    sb.append("    }\n");
+                                    sb.append("    return ($r) ").append(RuntimeCodeSpecification.DELEGATE_FIELD).append('.').append(method.getName()).append("($$);\n");
+                                    sb.append("}\n");
+                                    method.setBody(sb.toString());
+                                }
+                            });
+
+                            // FIXME: copy this one...
+                            utils.implementMethodsFrom(cls, utils.asCtClass(RpcImplementation.class), new MethodGenerator() {
+                                @Override
+                                public void process(final CtMethod method) throws CannotCompileException {
+                                    final StringBuilder sb = new StringBuilder("\n");
+                                    sb.append("{\n");
+                                    sb.append("    throw new java.lang.IllegalStateException(\"No provider is processing supplied message\");\n");
+                                    sb.append("    return ($r) null;\n");
+                                    sb.append("}\n");
+                                    method.setBody(sb.toString());
+                                }
+                            });
+                        }
+                    });
+                } catch (CannotCompileException e) {
+                    throw new IllegalStateException("Failed to create class " + directProxyName, e);
+                }
+
+                final Class<?> c;
+                try {
+                    c = createdCls.toClass(serviceType.getClassLoader(), serviceType.getProtectionDomain());
+                } catch (CannotCompileException e) {
+                    throw new IllegalStateException(String.format("Failed to create class %s", createdCls), e);
+                }
+
+                try {
+                    return (T) c.newInstance();
+                } catch (InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(String.format("Failed to instantiated class %s", c), e);
+                }
+            }
+        };
+    }
+
+    @Override
+    protected <T extends RpcService> Supplier<T> routerSupplier(final Class<T> serviceType, final RpcServiceMetadata metadata) {
+        return new Supplier<T>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public T get() {
+                final CtClass supertype = utils.asCtClass(serviceType);
+                final String routerName = RuntimeCodeSpecification.getRouterName(serviceType);
+                final Class<?> potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(routerName);
+                if (potentialClass != null) {
+                    try {
+                        return (T)potentialClass.newInstance();
+                    } catch (InstantiationException | IllegalAccessException e) {
+                        throw new IllegalStateException("Failed to instantiate class", e);
+                    }
+                }
+
+                final CtClass targetCls;
+                try {
+                    targetCls = utils.createClass(routerName, supertype, new ClassGenerator() {
+                        @Override
+                        public void process(final CtClass cls) throws CannotCompileException {
+                            utils.field(cls, RuntimeCodeSpecification.DELEGATE_FIELD, serviceType);
+                            //utils.field(cls, REMOTE_INVOKER_FIELD,iface);
+                            utils.implementsType(cls, utils.asCtClass(RpcImplementation.class));
+
+                            for (final Class<? extends BaseIdentity> ctx : metadata.getContexts()) {
+                                utils.field(cls, RuntimeCodeSpecification.getRoutingTableField(ctx), Map.class);
+                            }
+
+                            utils.implementMethodsFrom(cls, supertype, new MethodGenerator() {
+                                @Override
+                                public void process(final CtMethod method) throws CannotCompileException {
+                                    final int ptl;
+                                    try {
+                                        ptl = method.getParameterTypes().length;
+                                    } catch (NotFoundException e) {
+                                        throw new CannotCompileException(e);
+                                    }
+                                    final StringBuilder sb = new StringBuilder();
+
+                                    switch (ptl) {
+                                    case 0:
+                                        sb.append("return ($r) ").append(RuntimeCodeSpecification.DELEGATE_FIELD).append('.').append(method.getName()).append("($$);");
+                                        break;
+                                    case 1:
+                                        final RpcMetadata rpcMeta = metadata.getRpcMethod(method.getName());
+                                        final String rtGetter = rpcMeta.getInputRouteGetter().getName();
+                                        final String stName = supertype.getName();
+
+                                        sb.append('\n');
+                                        sb.append("{\n");
+                                        sb.append("    if ($1 == null) {\n");
+                                        sb.append("        throw new IllegalArgumentException(\"RPC input must not be null and must contain a value for field ").append(rtGetter).append("\");\n");
+                                        sb.append("    }\n");
+                                        sb.append("    if ($1.").append(rtGetter).append("() == null) {\n");
+                                        sb.append("        throw new IllegalArgumentException(\"Field ").append(rtGetter).append(" must not be null\");\n");
+                                        sb.append("    }\n");
+
+                                        sb.append("    final org.opendaylight.yangtools.yang.binding.InstanceIdentifier identifier = $1.").append(rtGetter).append("()");
+                                        if (rpcMeta.isRouteEncapsulated()) {
+                                            sb.append(".getValue()");
+                                        }
+                                        sb.append(";\n");
+
+                                        sb.append("    ").append(supertype.getName()).append(" instance = (").append(stName).append(") ").append(RuntimeCodeSpecification.getRoutingTableField(rpcMeta.getContext())).append(".get(identifier);\n");
+                                        sb.append("    if (instance == null) {\n");
+                                        sb.append("        instance = ").append(RuntimeCodeSpecification.DELEGATE_FIELD).append(";\n");
+                                        sb.append("    }\n");
+
+                                        sb.append("    if (instance == null) {\n");
+                                        sb.append("        throw new java.lang.IllegalStateException(\"No routable provider is processing routed message for \" + String.valueOf(identifier));\n");
+                                        sb.append("    }\n");
+                                        sb.append("    return ($r) instance.").append(method.getName()).append("($$);\n");
+                                        sb.append('}');
+                                        break;
+                                    default:
+                                        throw new CannotCompileException(String.format("Unsupported parameters length %s", ptl));
+                                    }
+
+                                    method.setBody(sb.toString());
+                                }
+                            });
+
+                            // FIXME: move this into a template class
+                            utils.implementMethodsFrom(cls, utils.asCtClass(RpcImplementation.class), new MethodGenerator() {
+                                @Override
+                                public void process(final CtMethod method) throws CannotCompileException {
+                                    final StringBuilder sb = new StringBuilder("\n");
+                                    sb.append("{\n");
+                                    sb.append("    throw new java.lang.IllegalStateException(\"No provider is processing supplied message\");\n");
+                                    sb.append("    return ($r) null;\n");
+                                    sb.append("}\n");
+
+                                    method.setBody(sb.toString());
+                                }
+                            });
+                        }
+                    });
+                } catch (CannotCompileException e) {
+                    throw new IllegalStateException("Failed to create class " + routerName, e);
+                }
+
+                final Class<?> c;
+                try {
+                    c = targetCls.toClass(serviceType.getClassLoader(), serviceType.getProtectionDomain());
+                } catch (CannotCompileException e) {
+                    throw new IllegalStateException(String.format("Failed to compile class %s", targetCls), e);
+                }
+
+                try {
+                    return (T)c.newInstance();
+                } catch (InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(String.format("Failed to instantiate class %s", c), e);
+                }
+            }
+        };
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected RuntimeGeneratedInvokerPrototype generateListenerInvoker(final Class<? extends NotificationListener> listenerType) {
+        final String invokerName = RuntimeCodeSpecification.getInvokerName(listenerType);
+        final CtClass targetCls;
+
+        // Builder for a set of supported types. Filled while the target class is being generated
+        final Builder<Class<? extends Notification>> b = ImmutableSet.builder();
+
+        try {
+            targetCls = utils.createClass(invokerName, getBrokerNotificationListener(), new ClassGenerator() {
+                @Override
+                public void process(final CtClass cls) throws CannotCompileException {
+                    utils.field(cls, RuntimeCodeSpecification.DELEGATE_FIELD, listenerType);
+                    utils.implementMethodsFrom(cls, getBrokerNotificationListener(), new MethodGenerator() {
+                        @Override
+                        public void process(final CtMethod method) throws CannotCompileException {
+                            final StringBuilder sb = new StringBuilder("\n");
+
+                            sb.append("{\n");
+
+                            for (Method m : listenerType.getMethods()) {
+                                if (BindingReflections.isNotificationCallback(m)) {
+                                    final Class<?> argType = m.getParameterTypes()[0];
+
+                                    // populates builder above
+                                    b.add((Class<? extends Notification>) argType);
+
+                                    sb.append("    if ($1 instanceof ").append(argType.getName()).append(") {\n");
+                                    sb.append("        ").append(RuntimeCodeSpecification.DELEGATE_FIELD).append('.').append(m.getName()).append("((").append(argType.getName()).append(") $1);\n");
+                                    sb.append("        return null;\n");
+                                    sb.append("    } else ");
+                                }
+                            }
+
+                            sb.append("    return null;\n");
+                            sb.append("}\n");
+                            method.setBody(sb.toString());
+                        }
+                    });
+                }
+            });
+        } catch (CannotCompileException e) {
+            throw new IllegalStateException("Failed to create class " + invokerName, e);
+        }
+
+        final Class<?> finalClass;
+        try {
+            finalClass = targetCls.toClass(listenerType.getClassLoader(), listenerType.getProtectionDomain());
+        } catch (CannotCompileException e) {
+            throw new IllegalStateException(String.format("Failed to compile class %s", targetCls), e);
+        }
+
+        return new RuntimeGeneratedInvokerPrototype(b.build(), (Class<? extends org.opendaylight.controller.sal.binding.api.NotificationListener<?>>) finalClass);
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RuntimeCodeGenerator.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RuntimeCodeGenerator.xtend
deleted file mode 100644 (file)
index 834eb4f..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.codegen.impl
-
-import java.util.Map
-import javassist.ClassPool
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import org.opendaylight.yangtools.yang.binding.Notification
-import org.opendaylight.yangtools.yang.binding.RpcImplementation
-import org.opendaylight.yangtools.yang.binding.util.BindingReflections
-import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils
-
-import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeSpecification.*
-import org.opendaylight.yangtools.yang.binding.RpcService
-
-class RuntimeCodeGenerator extends AbstractRuntimeCodeGenerator {
-
-    new(ClassPool pool) {
-        super(pool)
-    }
-
-    override directProxySupplier(Class iface) {
-        return [|
-            val proxyName = iface.directProxyName;
-            val potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(proxyName)
-            if(potentialClass != null) {
-                return potentialClass.newInstance as RpcService;
-            }
-            val supertype = iface.asCtClass
-            val createdCls = createClass(iface.directProxyName, supertype) [
-                field(DELEGATE_FIELD, iface);
-                implementsType(RpcImplementation.asCtClass)
-                implementMethodsFrom(supertype) [
-                    body = '''
-                    {
-                        if(«DELEGATE_FIELD» == null) {
-                            throw new java.lang.IllegalStateException("No default provider is available");
-                        }
-                        return ($r) «DELEGATE_FIELD».«it.name»($$);
-                    }
-                    '''
-                ]
-                implementMethodsFrom(RpcImplementation.asCtClass) [
-                    body = '''
-                    {
-                        throw new java.lang.IllegalStateException("No provider is processing supplied message");
-                        return ($r) null;
-                    }
-                    '''
-                ]
-            ]
-            return createdCls.toClass(iface.classLoader).newInstance as RpcService
-        ]
-    }
-
-    override routerSupplier(Class iface, RpcServiceMetadata metadata) {
-        return [ |
-            val supertype = iface.asCtClass
-            val routerName = iface.routerName;
-            val potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(routerName)
-            if(potentialClass != null) {
-                return potentialClass.newInstance as RpcService;
-            }
-
-            val targetCls = createClass(iface.routerName, supertype) [
-
-
-                field(DELEGATE_FIELD, iface)
-                //field(REMOTE_INVOKER_FIELD,iface);
-                implementsType(RpcImplementation.asCtClass)
-
-                for (ctx : metadata.contexts) {
-                    field(ctx.routingTableField, Map)
-                }
-                implementMethodsFrom(supertype) [
-                    if (parameterTypes.size === 1) {
-                        val rpcMeta = metadata.getRpcMethod(name);
-                        val bodyTmp = '''
-                        {
-                            if($1 == null) {
-                                throw new IllegalArgumentException("RPC input must not be null and must contain a value for field «rpcMeta.inputRouteGetter.name»");
-                            }
-                            if($1.«rpcMeta.inputRouteGetter.name»() == null) {
-                                throw new IllegalArgumentException("Field «rpcMeta.inputRouteGetter.name» must not be null");
-                            }
-                            final «InstanceIdentifier.name» identifier = $1.«rpcMeta.inputRouteGetter.name»()«IF rpcMeta.
-                            routeEncapsulated».getValue()«ENDIF»;
-                            «supertype.name» instance = («supertype.name») «rpcMeta.context.routingTableField».get(identifier);
-                            if(instance == null) {
-                               instance = «DELEGATE_FIELD»;
-                            }
-                            if(instance == null) {
-                                throw new java.lang.IllegalStateException("No routable provider is processing routed message for " + String.valueOf(identifier));
-                            }
-                            return ($r) instance.«it.name»($$);
-                        }'''
-                        body = bodyTmp
-                    } else if (parameterTypes.size === 0) {
-                        body = '''return ($r) «DELEGATE_FIELD».«it.name»($$);'''
-                    }
-                ]
-                implementMethodsFrom(RpcImplementation.asCtClass) [
-                    body = '''
-                    {
-                        throw new java.lang.IllegalStateException("No provider is processing supplied message");
-                        return ($r) null;
-                    }
-                    '''
-                ]
-            ]
-            return  targetCls.toClass(iface.classLoader,iface.protectionDomain).newInstance as RpcService
-        ];
-    }
-
-    override generateListenerInvoker(Class iface) {
-        val callbacks = iface.methods.filter[BindingReflections.isNotificationCallback(it)]
-
-        val supportedNotification = callbacks.map[parameterTypes.get(0) as Class<? extends Notification>].toSet;
-
-        val targetCls = createClass(iface.invokerName, brokerNotificationListener) [
-            field(DELEGATE_FIELD, iface)
-            implementMethodsFrom(brokerNotificationListener) [
-                body = '''
-                    {
-                        «FOR callback : callbacks SEPARATOR " else "»
-                            «val cls = callback.parameterTypes.get(0).name»
-                                if($1 instanceof «cls») {
-                                    «DELEGATE_FIELD».«callback.name»((«cls») $1);
-                                    return null;
-                                }
-                        «ENDFOR»
-                        return null;
-                    }
-                '''
-            ]
-        ]
-        val finalClass = targetCls.toClass(iface.classLoader, iface.protectionDomain)
-        return new RuntimeGeneratedInvokerPrototype(supportedNotification,
-            finalClass as Class<? extends org.opendaylight.controller.sal.binding.api.NotificationListener<?>>);
-    }
-}
index f037e67..4664b58 100644 (file)
@@ -33,7 +33,7 @@ public class SingletonHolder {
 
     public static final ClassPool CLASS_POOL = ClassPool.getDefault();
     public static final JavassistUtils JAVASSIST = JavassistUtils.forClassPool(CLASS_POOL);
-    public static final org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator(
+    public static final org.opendaylight.controller.sal.binding.codegen.impl.DefaultRuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.DefaultRuntimeCodeGenerator(
             CLASS_POOL);
     public static final RuntimeCodeGenerator RPC_GENERATOR = RPC_GENERATOR_IMPL;
     public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.binding.test;
+package org.opendaylight.controller.sal.binding.codegen.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -13,17 +13,14 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-
 import java.util.ArrayList;
 import java.util.List;
-
 import javassist.ClassPool;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRoutingTable;
-import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
 import org.opendaylight.controller.sal.binding.test.mock.BarListener;
@@ -41,14 +38,14 @@ import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-public class RuntimeCodeGeneratorTest {
+public class DefaultRuntimeCodeGeneratorTest {
 
     private RuntimeCodeGenerator codeGenerator;
     private NotificationInvokerFactory invokerFactory;
 
     @Before
     public void initialize() {
-        this.codeGenerator = new RuntimeCodeGenerator(ClassPool.getDefault());
+        this.codeGenerator = new DefaultRuntimeCodeGenerator(ClassPool.getDefault());
         this.invokerFactory = codeGenerator.getInvokerFactory();
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java
new file mode 100644 (file)
index 0000000..1faa341
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RegisterRoleChangeListener implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java
new file mode 100644 (file)
index 0000000..f1d13e3
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Reply message sent from a RoleChangeNotifier to the Role Change Listener
+ *
+ * Can be sent to a separate actor system and hence should be made serializable.
+ */
+public class RegisterRoleChangeListenerReply implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java
new file mode 100644 (file)
index 0000000..de2733f
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Notification message representing a Role change of a cluster member
+ *
+ * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RoleChangeNotification implements Serializable {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChangeNotification(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
new file mode 100644 (file)
index 0000000..a9aa561
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import akka.actor.Actor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.serialization.Serialization;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
+/**
+ * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * the listeners (within the same node), which are registered with it.
+ * <p/>
+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
+ */
+public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
+
+    private String memberId;
+    private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+    private RoleChangeNotification latestRoleChangeNotification = null;
+
+    public RoleChangeNotifier(String memberId) {
+        this.memberId = memberId;
+    }
+
+    public static Props getProps(final String memberId) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new RoleChangeNotifier(memberId);
+            }
+        });
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
+        LOG.info("RoleChangeNotifier:{} created and ready for shard:{}",
+            Serialization.serializedActorPath(getSelf()), memberId);
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterRoleChangeListener) {
+            // register listeners for this shard
+
+            ActorRef curRef = registeredListeners.get(getSender().path());
+            if (curRef != null) {
+                // ActorPaths would pass equal even if the unique id of the actors are different
+                // if a listener actor is re-registering after reincarnation, then removing the existing
+                // entry so the actor path with correct unique id is registered.
+                registeredListeners.remove(getSender().path());
+            }
+            registeredListeners.put(getSender().path(), getSender());
+
+            LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId,
+                getSender().path().toString());
+
+            getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+
+            if (latestRoleChangeNotification != null) {
+                getSender().tell(latestRoleChangeNotification, getSelf());
+            }
+
+
+        } else if (message instanceof RoleChanged) {
+            // this message is sent by RaftActor. Notify registered listeners when this message is received.
+            RoleChanged roleChanged = (RoleChanged) message;
+
+            LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            latestRoleChangeNotification =
+                new RoleChangeNotification(roleChanged.getMemberId(),
+                    roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            for (ActorRef listener: registeredListeners.values()) {
+                listener.tell(latestRoleChangeNotification, getSelf());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        registeredListeners.clear();
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java
new file mode 100644 (file)
index 0000000..f315bfd
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * Role Change message initiated internally from the  Raft Actor when a the behavior/role changes.
+ *
+ * Since its internal , need not be serialized
+ *
+ */
+public class RoleChanged {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChanged(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
index aadc362..075c607 100644 (file)
@@ -12,12 +12,11 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 
 public class CompositeModificationPayload extends Payload implements
     Serializable {
@@ -73,4 +72,8 @@ public class CompositeModificationPayload extends Payload implements
     public Object getModification(){
         return this.modification;
     }
+
+    public int size(){
+        return this.modification.getSerializedSize();
+    }
 }
index 502c338..7df5308 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
 
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 /**
  * An instance of a Payload class is meant to be used as the Payload for
@@ -81,6 +80,8 @@ public abstract class Payload {
     public abstract Payload decode(
         AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
 
+    public abstract int size();
+
 
 
 }
index b93be3e..de7f44e 100644 (file)
@@ -85,6 +85,16 @@ public final class InstallSnapshotMessages {
      * <code>optional int32 totalChunks = 7;</code>
      */
     int getTotalChunks();
+
+    // optional int32 lastChunkHashCode = 8;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    boolean hasLastChunkHashCode();
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    int getLastChunkHashCode();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
@@ -172,6 +182,11 @@ public final class InstallSnapshotMessages {
               totalChunks_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              lastChunkHashCode_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -351,6 +366,22 @@ public final class InstallSnapshotMessages {
       return totalChunks_;
     }
 
+    // optional int32 lastChunkHashCode = 8;
+    public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+    private int lastChunkHashCode_;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public boolean hasLastChunkHashCode() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public int getLastChunkHashCode() {
+      return lastChunkHashCode_;
+    }
+
     private void initFields() {
       term_ = 0L;
       leaderId_ = "";
@@ -359,6 +390,7 @@ public final class InstallSnapshotMessages {
       data_ = com.google.protobuf.ByteString.EMPTY;
       chunkIndex_ = 0;
       totalChunks_ = 0;
+      lastChunkHashCode_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -393,6 +425,9 @@ public final class InstallSnapshotMessages {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt32(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeInt32(8, lastChunkHashCode_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -430,6 +465,10 @@ public final class InstallSnapshotMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(8, lastChunkHashCode_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -560,6 +599,8 @@ public final class InstallSnapshotMessages {
         bitField0_ = (bitField0_ & ~0x00000020);
         totalChunks_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        lastChunkHashCode_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -616,6 +657,10 @@ public final class InstallSnapshotMessages {
           to_bitField0_ |= 0x00000040;
         }
         result.totalChunks_ = totalChunks_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.lastChunkHashCode_ = lastChunkHashCode_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -655,6 +700,9 @@ public final class InstallSnapshotMessages {
         if (other.hasTotalChunks()) {
           setTotalChunks(other.getTotalChunks());
         }
+        if (other.hasLastChunkHashCode()) {
+          setLastChunkHashCode(other.getLastChunkHashCode());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -957,6 +1005,39 @@ public final class InstallSnapshotMessages {
         return this;
       }
 
+      // optional int32 lastChunkHashCode = 8;
+      private int lastChunkHashCode_ ;
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public boolean hasLastChunkHashCode() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public int getLastChunkHashCode() {
+        return lastChunkHashCode_;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder setLastChunkHashCode(int value) {
+        bitField0_ |= 0x00000080;
+        lastChunkHashCode_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder clearLastChunkHashCode() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        lastChunkHashCode_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
     }
 
@@ -983,13 +1064,14 @@ public final class InstallSnapshotMessages {
   static {
     java.lang.String[] descriptorData = {
       "\n\025InstallSnapshot.proto\022(org.opendayligh" +
-      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
       "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
       "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
       "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
-      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
-      "light.controller.protobuff.messages.clus" +
-      "ter.raftB\027InstallSnapshotMessagesH\001"
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+      "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+      "ler.protobuff.messages.cluster.raftB\027Ins" +
+      "tallSnapshotMessagesH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1001,7 +1083,7 @@ public final class InstallSnapshotMessages {
           internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
-              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
           return null;
         }
       };
index 4198644..adb58ae 100644 (file)
@@ -12,4 +12,5 @@ message InstallSnapshot {
     optional bytes data = 5;
     optional int32 chunkIndex = 6;
     optional int32 totalChunks = 7;
+    optional int32 lastChunkHashCode = 8;
 }
index 99df01a..d3abd38 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
@@ -37,8 +38,10 @@ import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
@@ -103,7 +106,10 @@ public class TestModel {
   private static final String TWO_ONE_NAME = "one";
   private static final String TWO_TWO_NAME = "two";
   private static final String DESC = "Hello there";
-
+  private static final Long LONG_ID = 1L;
+  private static final Boolean ENABLED = false;
+  private static final Short SHORT_ID = 1;
+  private static final Byte BYTE_ID = 1;
   // Family specific constants
   public static final QName FAMILY_QNAME =
       QName
@@ -142,6 +148,7 @@ public class TestModel {
   private static final String FIRST_GRAND_CHILD_NAME = "first grand child";
   private static final String SECOND_GRAND_CHILD_NAME = "second grand child";
 
+
   // first child
   private static final YangInstanceIdentifier CHILDREN_1_PATH =
       YangInstanceIdentifier.builder(CHILDREN_PATH)
@@ -364,30 +371,45 @@ public class TestModel {
                       QName.create(TEST_QNAME, "my-bits"))).withValue(
               ImmutableSet.of("foo", "bar"));
 
+      // Create unkey list entry
+      UnkeyedListEntryNode binaryDataKey =
+          Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).
+              withChild(ImmutableNodes.leafNode(SOME_BINARY_DATE_QNAME, DESC)).build();
+
+      Map<QName, Object> keyValues = new HashMap<>();
+      keyValues.put(CHILDREN_QNAME, FIRST_CHILD_NAME);
+
 
       // Create the document
       return ImmutableContainerNodeBuilder
               .create()
               .withNodeIdentifier(
-                      new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+                  new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
               .withChild(myBits.build())
               .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
-              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
+              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, ENABLED))
+              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, SHORT_ID))
+              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, BYTE_ID))
               .withChild(
-                      ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
-                              .builder().build()))
+                  ImmutableNodes.leafNode(SOME_REF_QNAME, GRAND_CHILD_1_PATH))
               .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
-
+               .withChild(Builders.unkeyedListBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(OUTER_LIST_QNAME))
+                   .withChild(binaryDataKey).build())
+               .withChild(Builders.orderedMapBuilder()
+                   .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).withChild(mapEntry).build())
+               .withChild(Builders.choiceBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+                   .withChild(ImmutableNodes.leafNode(DESC_QNAME, LONG_ID)).build())
                       // .withChild(augmentationNode)
               .withChild(shoes)
               .withChild(numbers)
               .withChild(switchFeatures)
               .withChild(
-                      mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
+                  mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
               .withChild(
-                      mapNodeBuilder(OUTER_LIST_QNAME)
-                              .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
-                              .withChild(BAR_NODE).build());
+                  mapNodeBuilder(OUTER_LIST_QNAME)
+                      .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+                      .withChild(BAR_NODE).build()
+              );
   }
 
   public static ContainerNode createTestContainer() {
index e18c00e..daba3fd 100644 (file)
@@ -8,17 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Contains contextual data for a data store.
  *
@@ -120,6 +119,7 @@ public class DatastoreContext {
         private boolean persistent = true;
         private ConfigurationReader configurationReader = new FileConfigurationReader();
         private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+        private int shardSnapshotDataThresholdPercentage = 12;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -156,6 +156,12 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+            this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+            return this;
+        }
+
+
         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
             this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
             return this;
@@ -191,12 +197,14 @@ public class DatastoreContext {
             return this;
         }
 
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                     TimeUnit.MILLISECONDS));
             raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+            raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             raftConfig.setIsolatedLeaderCheckInterval(
                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
 
index d53cb48..af16d02 100644 (file)
@@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -131,6 +132,8 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
+    private Optional<ActorRef> roleChangeNotifier;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -171,6 +174,9 @@ public class Shard extends RaftActor {
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+        // create a notifier actor for each cluster member
+        roleChangeNotifier = createRoleChangeNotifier(name.toString());
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -196,6 +202,12 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
+    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+    }
+
     @Override
     public void postStop() {
         super.postStop();
@@ -259,6 +271,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
@@ -771,6 +788,7 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
+        shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
     }
 
     @Override
index 9decd82..4fc2ed2 100644 (file)
@@ -62,6 +62,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
+    private volatile long dataSize = 0;
+
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
@@ -218,6 +220,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
+    public void setDataSize(long dataSize){
+        this.dataSize = dataSize;
+    }
+
+    @Override
+    public long getDataSize(){
+        return dataSize;
+    }
+
     @Override
     public ThreadExecutorStats getDataStoreExecutorStats() {
         // FIXME: this particular thing does not work, as it really is DS-specific
index 4d3d438..367d4f4 100644 (file)
@@ -54,6 +54,12 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef percentage {
+        type uint8 {
+            range "0..100";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
@@ -88,9 +94,16 @@ module distributed-datastore-provider {
          leaf shard-snapshot-batch-count {
             default 20000;
             type non-zero-uint32-type;
-            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
          }
 
+         leaf shard-snapshot-data-threshold-percentage {
+            default 12;
+            type percentage;
+            description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+         }
+
+
          leaf shard-hearbeat-interval-in-millis {
             default 500;
             type heartbeat-interval-type;
index 04d889f..9e02223 100644 (file)
@@ -1,5 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -13,13 +19,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class CompositeModificationPayloadTest {
 
 
@@ -60,6 +59,11 @@ public class CompositeModificationPayloadTest {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         AppendEntries appendEntries =
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java
new file mode 100644 (file)
index 0000000..4e61260
--- /dev/null
@@ -0,0 +1,79 @@
+package org.opendaylight.controller.cluster.datastore;
+
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RoleChangeNotifierTest extends AbstractActorTest  {
+
+    @Test
+    public void testHandleRegisterRoleChangeListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListener";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNull(notification);
+        }};
+
+    }
+
+    @Test
+    public void testHandleRaftRoleChanged() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActor =  getTestActor();
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
+
+            notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
+
+            // no notification should be sent as listener has not yet registered
+            assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class));
+
+            // listener registers after role has been changed, ensure we sent the latest role change after a reply
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNotNull(notification);
+            assertEquals(RaftState.Candidate.name(), notification.getOldRole());
+            assertEquals(RaftState.Leader.name(), notification.getNewRole());
+
+        }};
+
+    }
+}
+
+
index f75aa54..4bd0ad8 100644 (file)
@@ -8,10 +8,19 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * MessageCollectorActor collects messages as it receives them. It can send
@@ -27,10 +36,55 @@ public class MessageCollectorActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
             if("messages".equals(message)){
-                getSender().tell(messages, getSelf());
+                getSender().tell(new ArrayList(messages), getSelf());
             }
         } else {
             messages.add(message);
         }
     }
+
+    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = Patterns.ask(actor, "messages", operationTimeout);
+
+        try {
+            return (List<Object>) Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * Get the first message that matches the specified class
+     * @param actor
+     * @param clazz
+     * @return
+     */
+    public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return message;
+            }
+        }
+
+        return null;
+    }
+
+    public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        List<Object> output = Lists.newArrayList();
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                output.add(message);
+            }
+        }
+
+        return output;
+    }
+
 }
index a2b78c6..a3041e8 100644 (file)
@@ -13,19 +13,18 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class Client {
 
     private static ActorSystem actorSystem;
@@ -93,6 +92,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@@ -113,6 +117,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);
index b50dfb1..6ffe147 100644 (file)
@@ -15,26 +15,24 @@ import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -82,8 +80,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider)
+        if ( provider instanceof ClusterActorRefProvider) {
             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+        }
     }
 
 
@@ -94,8 +93,11 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
 
         if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
             log.info("Received probe {} {}", getSelf(), message);
             probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else if (message instanceof UpdateBucket) {
             receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
@@ -184,13 +186,15 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
-        if (members.contains(selfAddress))
+        if (members.contains(selfAddress)) {
             buckets.put(selfAddress, localBucket);
+        }
 
         //then get buckets for requested remote nodes
         for (Address address : members){
-            if (remoteBuckets.containsKey(address))
+            if (remoteBuckets.containsKey(address)) {
                 buckets.put(address, remoteBuckets.get(address));
+            }
         }
 
         return buckets;
@@ -214,7 +218,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
 
         if (receivedBuckets == null || receivedBuckets.isEmpty())
+         {
             return; //nothing to do
+        }
 
         //Remote cant update self's bucket
         receivedBuckets.remove(selfAddress);
@@ -222,15 +228,20 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
 
             Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) localVersion = -1L;
+            if (localVersion == null) {
+                localVersion = -1L;
+            }
 
             Bucket receivedBucket = entry.getValue();
 
-            if (receivedBucket == null)
+            if (receivedBucket == null) {
                 continue;
+            }
 
             Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) remoteVersion = -1L;
+            if (remoteVersion == null) {
+                remoteVersion = -1L;
+            }
 
             //update only if remote version is newer
             if ( remoteVersion.longValue() > localVersion.longValue() ) {
index ee96cb8..e0d145d 100644 (file)
@@ -1,14 +1,22 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
-
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.ChildActorPath;
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import com.google.common.base.Predicate;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -16,224 +24,226 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.utils.ConditionalProbe;
 import org.opendaylight.yangtools.yang.common.QName;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.Nullable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-
 public class RpcRegistryTest {
 
-  private static ActorSystem node1;
-  private static ActorSystem node2;
-  private static ActorSystem node3;
-
-  private ActorRef registry1;
-  private ActorRef registry2;
-  private ActorRef registry3;
-
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
-    RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
-    RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
-    node1 = ActorSystem.create("opendaylight-rpc", config1.get());
-    node2 = ActorSystem.create("opendaylight-rpc", config2.get());
-    node3 = ActorSystem.create("opendaylight-rpc", config3.get());
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(node1);
-    JavaTestKit.shutdownActorSystem(node2);
-    JavaTestKit.shutdownActorSystem(node3);
-    if (node1 != null)
-      node1.shutdown();
-    if (node2 != null)
-      node2.shutdown();
-    if (node3 != null)
-      node3.shutdown();
-
-  }
-
-  @Before
-  public void createRpcRegistry() throws InterruptedException {
-    registry1 = node1.actorOf(Props.create(RpcRegistry.class));
-    registry2 = node2.actorOf(Props.create(RpcRegistry.class));
-    registry3 = node3.actorOf(Props.create(RpcRegistry.class));
-  }
-
-  @After
-  public void stopRpcRegistry() throws InterruptedException {
-    if (registry1 != null)
-      node1.stop(registry1);
-    if (registry2 != null)
-      node2.stop(registry2);
-    if (registry3 != null)
-      node3.stop(registry3);
-  }
-
-  /**
-   * One node cluster.
-   * 1. Register rpc, ensure router can be found
-   * 2. Then remove rpc, ensure its deleted
-   *
-   * @throws URISyntaxException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
-
-    final JavaTestKit mockBroker = new JavaTestKit(node1);
-
-    final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
-
-    //install probe
-    final JavaTestKit probe1 = createProbeForMessage(
-        node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker.getRef());
-
-    //Bucket store should get an update bucket message. Updated bucket contains added rpc.
-    probe1.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateBucket.class);
-
-    //Now remove rpc
-    registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
-
-    //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
-    probe1.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateBucket.class);
-
-
-  }
-
-
-  /**
-   * Three node cluster.
-   * 1. Register rpc on 1 node, ensure 2nd node gets updated
-   * 2. Remove rpc on 1 node, ensure 2nd node gets updated
-   *
-   * @throws URISyntaxException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
-
-    final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-
-    //install probe on node2's bucket store
-    final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
-    final JavaTestKit probe2 = createProbeForMessage(
-        node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-    //Bucket store on node2 should get a message to update its local copy of remote buckets
-    probe2.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-    //Now remove
-    registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
-
-    //Bucket store on node2 should get a message to update its local copy of remote buckets
-    probe2.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-  }
-
-  /**
-   * Three node cluster.
-   * Register rpc on 2 nodes. Ensure 3rd gets updated.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testRpcAddedOnMultiNodes() throws Exception {
-
-    final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-    final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-    final JavaTestKit mockBroker3 = new JavaTestKit(node3);
-
-    registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
-
-    //install probe on node 3
-    final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
-    final JavaTestKit probe3 = createProbeForMessage(
-        node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-    probe3.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
+    private static ActorSystem node1;
+    private static ActorSystem node2;
+    private static ActorSystem node3;
+
+    private ActorRef registry1;
+    private ActorRef registry2;
+    private ActorRef registry3;
+
+    @BeforeClass
+    public static void staticSetup() throws InterruptedException {
+      RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+      RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+      RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+      node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+      node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+      node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+    }
+
+    @AfterClass
+    public static void staticTeardown() {
+      JavaTestKit.shutdownActorSystem(node1);
+      JavaTestKit.shutdownActorSystem(node2);
+      JavaTestKit.shutdownActorSystem(node3);
+    }
+
+    @Before
+    public void setup() {
+        registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+        registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+        registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+    }
+
+    @After
+    public void teardown() {
+        if (registry1 != null) {
+            node1.stop(registry1);
+        }
+        if (registry2 != null) {
+            node2.stop(registry2);
+        }
+        if (registry3 != null) {
+            node3.stop(registry3);
+        }
+    }
+
+    /**
+     * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
+     * deleted
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testAddRemoveRpcOnSameNode() throws Exception {
+
+        System.out.println("testAddRemoveRpcOnSameNode starting");
+
+        final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+        final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+
+        // install probe
+        final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+        // Bucket store should get an update bucket message. Updated bucket contains added rpc.
+        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        // Now remove rpc
+        registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+
+        // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
+        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        System.out.println("testAddRemoveRpcOnSameNode ending");
+
+    }
+
+    /**
+     * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
+     * 1 node, ensure 2nd node gets updated
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testRpcAddRemoveInCluster() throws Exception {
+
+        System.out.println("testRpcAddRemoveInCluster starting");
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+        // install probe on node2's bucket store
+        final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
+        final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets
+        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Now remove
+        registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets
+        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        System.out.println("testRpcAddRemoveInCluster ending");
+    }
+
+    /**
+     * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRpcAddedOnMultiNodes() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+        final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+        registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+
+        // install probe on node 3
+        final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
+        final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
 
-    //Add same rpc on node 2
-    registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-    registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
 
-    probe3.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-  }
-
-  private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz) {
-    final JavaTestKit probe = new JavaTestKit(node);
+        // Add same rpc on node 2
+        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+        registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
 
-    ConditionalProbe conditionalProbe =
-        new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
-          @Override
-          public boolean apply(@Nullable Object input) {
-              if (input != null)
-                return clazz.equals(input.getClass());
-              else
-                  return false;
-          }
-        });
-
-    ActorSelection subject = node.actorSelection(subjectPath);
-    subject.tell(conditionalProbe, ActorRef.noSender());
-
-    return probe;
+        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+    }
 
-  }
+    private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
+            throws Exception {
+        final JavaTestKit probe = new JavaTestKit(node);
 
-  private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
-    return new AddOrUpdateRoutes(createRouteIds());
-  }
-
-  private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
-    return new RemoveRoutes(createRouteIds());
-  }
+        ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
+            @Override
+            public boolean apply(@Nullable Object input) {
+                if (input != null) {
+                    return clazz.equals(input.getClass());
+                } else {
+                    return false;
+                }
+            }
+        });
 
-  private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
-    QName type = new QName(new URI("/mockrpc"), "mockrpc");
-    List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
-    routeIds.add(new RouteIdentifierImpl(null, type, null));
-    return routeIds;
-  }
+        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+        Timeout timeout = new Timeout(duration);
+        int maxTries = 30;
+        int i = 0;
+        while(true) {
+            ActorSelection subject = node.actorSelection(subjectPath);
+            Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+
+            try {
+                Await.ready(future, duration);
+                break;
+            } catch (TimeoutException | InterruptedException e) {
+                if(++i > maxTries) {
+                    throw e;
+                }
+            }
+        }
+
+        return probe;
+
+    }
+
+    private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
+        return new AddOrUpdateRoutes(createRouteIds());
+    }
+
+    private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
+        return new RemoveRoutes(createRouteIds());
+    }
+
+    private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+        QName type = new QName(new URI("/mockrpc"), "mockrpc");
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+        routeIds.add(new RouteIdentifierImpl(null, type, null));
+        return routeIds;
+    }
 
 }
diff --git a/opendaylight/md-sal/statistics-manager-config/pom.xml b/opendaylight/md-sal/statistics-manager-config/pom.xml
new file mode 100644 (file)
index 0000000..f4b5f76
--- /dev/null
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-parent</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.opendaylight.controller.md</groupId>
+    <artifactId>statistics-manager-config</artifactId>
+    <description>Configuration files for statistics manager</description>
+    <packaging>jar</packaging>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${project.build.directory}/classes/initial/30-statistics-manager.xml</file>
+                                    <type>xml</type>
+                                    <classifier>config</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/opendaylight/md-sal/statistics-manager-config/src/main/resources/initial/30-statistics-manager.xml b/opendaylight/md-sal/statistics-manager-config/src/main/resources/initial/30-statistics-manager.xml
new file mode 100644 (file)
index 0000000..797c14e
--- /dev/null
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+    <configuration>
+        <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+            <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+                <module>
+                    <type xmlns:statsmanager="urn:opendaylight:params:xml:ns:yang:controller:md:sal:statistics-manager">
+                        statsmanager:statistics-manager
+                    </type>
+                    <name>statistics-manager</name>
+
+                    <rpc-registry>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-rpc-registry</type>
+                        <name>binding-rpc-broker</name>
+                    </rpc-registry>
+
+                    <data-broker>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+                        <name>binding-data-broker</name>
+                    </data-broker>
+
+                    <notification-service>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">
+                            binding:binding-notification-service
+                        </type>
+                        <name>binding-notification-broker</name>
+                    </notification-service>
+
+                    <statistics-manager-settings>
+                        <min-request-net-monitor-interval>3000</min-request-net-monitor-interval>
+                        <max-nodes-for-collector>16</max-nodes-for-collector>
+                    </statistics-manager-settings>
+
+                </module>
+            </modules>
+        </data>
+    </configuration>
+
+    <required-capabilities>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:statistics-manager?module=statistics-manager&amp;revision=2014-09-25</capability>
+    </required-capabilities>
+
+</snapshot>
+
index eb6c51b..234e108 100644 (file)
       <artifactId>org.osgi.core</artifactId>
       <scope>provided</scope>
     </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-binding-config</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>config-api</artifactId>
+      </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
 
   <build>
     <plugins>
-      <plugin>
-        <groupId>org.apache.felix</groupId>
-        <artifactId>maven-bundle-plugin</artifactId>
-        <configuration>
-          <instructions>
-            <Bundle-Activator>org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator</Bundle-Activator>
-          </instructions>
-        </configuration>
-      </plugin>
+        <plugin>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <configuration>
+                <instructions>
+                    <Import-Package>*</Import-Package>
+                </instructions>
+            </configuration>
+        </plugin>
+        <plugin>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-maven-plugin</artifactId>
+            <executions>
+                <execution>
+                    <id>config</id>
+                    <goals>
+                        <goal>generate-sources</goal>
+                    </goals>
+                    <configuration>
+                        <codeGenerators>
+                            <generator>
+                                <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+                                <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+                                <additionalConfiguration>
+                                    <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+                                </additionalConfiguration>
+                            </generator>
+                            <generator>
+                                <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+                                <outputBaseDir>${salGeneratorPath}</outputBaseDir>
+                            </generator>
+                        </codeGenerators>
+                        <inspectDependencies>true</inspectDependencies>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
     </plugins>
   </build>
   <scm>
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/config/yang/md/sal/statistics_manager/StatisticsManagerModule.java