Merge "BUG-1690: catch wildcard InstanceIdentifiers"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 5 Sep 2014 07:10:12 +0000 (07:10 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 5 Sep 2014 07:10:12 +0000 (07:10 +0000)
111 files changed:
features/akka/pom.xml [new file with mode: 0644]
features/akka/src/main/resources/features.xml [new file with mode: 0644]
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
features/netconf/pom.xml
features/netconf/src/main/resources/features.xml
features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight-karaf/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/persistent/PersistentMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Persistent.proto
opendaylight/md-sal/sal-clustering-config/pom.xml
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDOMStoreTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java

diff --git a/features/akka/pom.xml b/features/akka/pom.xml
new file mode 100644 (file)
index 0000000..f1f3017
--- /dev/null
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Necessary TODO: Put your copyright here.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.2-SNAPSHOT</version>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+   </parent>
+   <artifactId>features-akka</artifactId>
+   <groupId>org.opendaylight.controller</groupId>
+   <packaging>jar</packaging>
+   <properties>
+      <features.file>features.xml</features.file>
+      <!-- Optional TODO: Move these properties to your parent pom and possibly
+            DependencyManagement section of your parent pom -->
+      <branding.version>1.0.0-SNAPSHOT</branding.version>
+      <karaf.resources.version>1.4.2-SNAPSHOT</karaf.resources.version>
+      <karaf.version>3.0.1</karaf.version>
+      <feature.test.version>0.6.2-SNAPSHOT</feature.test.version>
+      <karaf.empty.version>1.4.2-SNAPSHOT</karaf.empty.version>
+      <surefire.version>2.16</surefire.version>
+   </properties>
+   <dependencies>
+    <!--
+      Necessary TODO: Put dependencies on any feature repos
+      you use in your features.xml file.
+
+      Note: they will need to be <type>xml</xml>
+      and <classifier>features</classifier>.
+      One other thing to watch for is to make sure they are
+      <scope>compile</compile>, which they should be by default,
+      but be cautious lest they be at a different scope in a parent pom.
+
+      Examples:
+        <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>features-yangtools</artifactId>
+          <version>0.6.2-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>features-mdsal</artifactId>
+          <version>1.1-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.openflowplugin</groupId>
+          <artifactId>features-openflowplugin</artifactId>
+          <version>0.0.3-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for bundles directly referenced
+      in your features.xml file.  For every <bundle> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Examples:
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-provider</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-model</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for configfiles directly referenced
+      in your features.xml file.  For every <configfile> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Example (presuming here version is coming from the parent pom):
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-config</artifactId>
+        <version>${project.version}</version>
+        <type>xml</type>
+        <classifier>config</classifier>
+      </dependency>
+    -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe</groupId>
+      <artifactId>config</artifactId>
+      <version>${typesafe.config.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.uncommons.maths</groupId>
+        <artifactId>uncommons-maths</artifactId>
+        <version>${uncommons.maths.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jcommon</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jfreechart</artifactId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.8.0.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-remote_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-cluster_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.iq80.leveldb</groupId>
+      <artifactId>leveldb</artifactId>
+      <version>${leveldb.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>${leveldbjni.version}</version>
+    </dependency>
+    <!--
+      Optional TODO: Remove TODO comments.
+    -->
+    <!-- test to validate features.xml -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-test</artifactId>
+      <version>${feature.test.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- dependency for opendaylight-karaf-empty for use by testing -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>opendaylight-karaf-empty</artifactId>
+      <version>${karaf.empty.version}</version>
+      <type>zip</type>
+    </dependency>
+    <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.2</version>
+    </dependency>
+    -->
+
+   </dependencies>
+   <build>
+      <resources>
+         <resource>
+            <directory>src/main/resources</directory>
+            <filtering>true</filtering>
+         </resource>
+      </resources>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>filter</id>
+                  <phase>generate-resources</phase>
+                  <goals>
+                     <goal>resources</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>attach-artifacts</id>
+                  <phase>package</phase>
+                  <goals>
+                     <goal>attach-artifact</goal>
+                  </goals>
+                  <configuration>
+                     <artifacts>
+                        <artifact>
+                           <file>${project.build.directory}/classes/${features.file}</file>
+                           <type>xml</type>
+                           <classifier>features</classifier>
+                        </artifact>
+                     </artifacts>
+                  </configuration>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>${surefire.version}</version>
+            <configuration>
+              <systemPropertyVariables>
+                <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+                <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+                <karaf.distro.version>${karaf.empty.version}</karaf.distro.version>
+              </systemPropertyVariables>
+              <dependenciesToScan>
+               <dependency>org.opendaylight.yangtools:features-test</dependency>
+              </dependenciesToScan>
+            </configuration>
+          </plugin>
+      </plugins>
+   </build>
+   <scm>
+      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+      <tag>HEAD</tag>
+      <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
+   </scm>
+</project>
diff --git a/features/akka/src/main/resources/features.xml b/features/akka/src/main/resources/features.xml
new file mode 100644 (file)
index 0000000..182ff76
--- /dev/null
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Necessary TODO: Put your copyright statement here
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<features name="odl-controller-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+    <!--
+        Necessary TODO: Please read the features guidelines:
+        https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Best_Practices
+    -->
+    <!--
+    Necessary TODO: Add repo entries for the repositories of features you refer to
+        in this feature file but do not define here.
+        Examples:
+            <repository>mvn:org.opendaylight.yangtools/features-yangtools/0.6.2-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.controller/features-mdsal/1.1-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.openflowplugin/features-openflowplugin/0.0.3-SNAPSHOT/xml/features</repository>
+    -->
+    <feature name='odl-akka-all' version='${project.version}' description='OpenDaylight :: Akka :: All'>
+        <!--
+            Necessary TODO:
+            List all of the user consumable features you define in this feature file here.
+            Generally you would *not* list individual bundles here, but only features defined in *this* file.
+            It is useful to list them in the same order they occur in the file.
+
+            Examples:
+            <feature version='${project.version}'>odl-controller-provider</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+        -->
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <feature version="${akka.version}">odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-persistence</feature>
+    </feature>
+    <!--
+        Necessary TODO: Define your features.  It is useful to list then in order of dependency.  So if A depends on B, list A first.
+        When naming your features please be mindful of the guidelines:
+            https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines
+        Particularly:
+            a) Prefixing names with 'odl-': https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Naming
+            b) Descriptions: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Description
+            c) Avoid start-levels: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Avoid_start-levels
+
+        It's also nice to list inside a feature, first the features it needs, then the bundles it needs, then the configfiles.
+        Examples:
+
+        * Basic MD-SAL Provider
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider '>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Model feature
+        <feature name='odl-controller-model' version='${project.version}' description='OpenDaylight :: controller :: Model'>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-binding</feature>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-models</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-model/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Config Subsystem example - the config file is your config subsystem configuration
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            <configfile finalname="etc/opendaylight/karaf/80-controller.xml">mvn:org.opendaylight.controller/controller-config/${project.version}/xml/config</configfile>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Provider that uses openflowplugin-flow-services (which brings along odl-mdsal-broker)
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='0.0.3-SNAPSHOT'>odl-openflowplugin-flow-services</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+    -->
+    <feature name="odl-akka-scala" description="Scala Runtime for OpenDaylight" version="${scala.version}">
+        <bundle>mvn:org.scala-lang/scala-library/${scala.version}.${scala.micro.version}</bundle>
+        <bundle>mvn:org.scala-lang/scala-reflect/${scala.version}.${scala.micro.version}</bundle>
+    </feature>
+    <feature name="odl-akka-system" description="Akka Actor Framework System Bundles" version="${akka.version}">
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <bundle>mvn:com.typesafe/config/${typesafe.config.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name="odl-akka-clustering" description="Akka Clustering" version="${akka.version}">
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>wrap:mvn:org.uncommons.maths/uncommons-maths/${uncommons.maths.version}</bundle>
+        <bundle>mvn:com.google.protobuf/protobuf-java/${protobuf.version}</bundle>
+        <bundle>mvn:io.netty/netty/3.8.0.Final</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name='odl-akka-leveldb' description='LevelDB' version='0.7'>
+        <bundle>wrap:mvn:org.iq80.leveldb/leveldb/${leveldb.version}</bundle>
+        <bundle>mvn:org.fusesource.leveldbjni/leveldbjni-all/${leveldbjni.version}</bundle>
+    </feature>
+    <feature name='odl-akka-persistence' description='Akka Persistence' version="${akka.version}">
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version}</bundle>
+        <bundle>wrap:mvn:com.google.protobuf/protobuf-java/${protobuf.version}$overwrite=merge&amp;DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.*</bundle>
+    </feature>
+    <!-- Optional TODO: Remove TODO Comments -->
+
+</features>
index c6856c8..38fe92f 100644 (file)
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-akka</artifactId>
+      <version>${commons.opendaylight.version}</version>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-api</artifactId>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-distributed-datastore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-remoterpc-connector</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-commons</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-akka-raft</artifactId>
+      <version>${mdsal.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-netconf-connector</artifactId>
index 408be62..619eaee 100644 (file)
@@ -7,11 +7,13 @@
     <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-persister/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-netty/${config.version}/xml/features</repository>
+    <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
         <feature version='${project.version}'>odl-restconf</feature>
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
     <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <bundle>mvn:org.opendaylight.controller/sal-netconf-connector/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller.model/model-inventory/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/netconf-config-dispatcher/${config.version}</bundle>
+        <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
+    </feature>
+    <feature name='odl-mdsal-netconf-connector-ssh' version='${project.version}' description="OpenDaylight :: MDSAL :: Netconf Connector + Netconf SSH Server + loopback connection configuration">
+        <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+        <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
         <configfile finalname="${config.configfile.directory}/${config.netconf.connector.configfile}">mvn:org.opendaylight.controller/netconf-connector-config/${netconf.version}/xml/config</configfile>
     </feature>
     <feature name='odl-restconf' version='${project.version}' description="OpenDaylight :: Restconf">
         <bundle>mvn:com.sun.jersey/jersey-servlet/${jersey.version}</bundle>
         <bundle>wrap:mvn:org.json/json/${org.json.version}</bundle>
     </feature>
+    <feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${akka.version}'>odl-akka-system</feature>
+        <feature version='${akka.version}'>odl-akka-persistence</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/sal-akka-raft/${project.version}</bundle>
+        <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
+    </feature>
+    <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
+        <feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
+        <configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
+        <configfile finalname="configuration/initial/akka.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf</configfile>
+        <configfile finalname="configuration/initial/module-shards.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf</configfile>
+        <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
+    </feature>
 </features>
index d18d227..46f83fb 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-auth</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-tcp</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-ssh</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcpkix-jdk15on</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring</artifactId>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-connector-config</artifactId>
+      <version>${config.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-monitoring</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.aaa</groupId>
+      <artifactId>features-aaa</artifactId>
+      <version>${aaa.version}</version>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
   </dependencies>
 
   <build>
index 0a63562..4157212 100644 (file)
@@ -5,11 +5,15 @@
           xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
   <repository>mvn:org.opendaylight.controller/features-protocol-framework/${protocol-framework.version}/xml/features</repository>
   <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
+  <repository>mvn:org.opendaylight.aaa/features-aaa/${aaa.version}/xml/features</repository>
+
   <feature name='odl-netconf-all' version='${project.version}' description="OpenDaylight :: Netconf :: All">
     <feature version='${project.version}'>odl-netconf-api</feature>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-impl</feature>
+    <feature version='${project.version}'>odl-netconf-tcp</feature>
+    <feature version='${project.version}'>odl-netconf-ssh</feature>
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <feature version='${project.version}'>odl-netconf-client</feature>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
+    <!-- Netconf server without config connector is just an empty shell -->
+    <feature version='${project.version}'>odl-config-netconf-connector</feature>
+    <!-- Netconf will not provide schemas without monitoring -->
+    <feature version='${project.version}'>odl-netconf-monitoring</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-impl/${project.version}</bundle>
   </feature>
+  <feature name='odl-netconf-ssh' version='${project.version}' description="OpenDaylight :: Netconf :: SSSH">
+    <feature version='${project.version}'>odl-netconf-tcp</feature>
+    <feature version='${aaa.version}'>odl-aaa-authn-plugin</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-ssh/${project.version}</bundle>
+    <bundle>mvn:org.bouncycastle/bcpkix-jdk15on/${bouncycastle.version}</bundle>
+    <bundle>mvn:org.bouncycastle/bcprov-jdk15on/${bouncycastle.version}</bundle>
+  </feature>
+  <feature name='odl-netconf-tcp' version='${project.version}' description="OpenDaylight :: Netconf :: TCP">
+    <feature version='${project.version}'>odl-netconf-impl</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-tcp/${project.version}</bundle>
+  </feature>
   <feature name='odl-config-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf :: Connector">
     <feature version='${config.version}'>odl-config-manager</feature>
     <feature version='${project.version}'>odl-netconf-api</feature>
@@ -64,7 +83,6 @@
   <feature name='odl-netconf-client' version='${project.version}' description="OpenDaylight :: Netconf :: Client">
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-client/${project.version}</bundle>
-    <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
   </feature>
   <feature name='odl-netconf-monitoring' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring">
     <feature version='${project.version}'>odl-netconf-util</feature>
index 039060b..01156cf 100644 (file)
@@ -26,5 +26,6 @@
     <module>netconf</module>
     <module>protocol-framework</module>
     <module>adsal-compatibility</module>
+    <module>akka</module>
   </modules>
 </project>
\ No newline at end of file
index 2bc099d..2e817b9 100644 (file)
@@ -67,7 +67,9 @@
     <concepts.version>0.5.2-SNAPSHOT</concepts.version>
     <concurrentlinkedhashmap.version>1.4</concurrentlinkedhashmap.version>
     <config.version>0.2.5-SNAPSHOT</config.version>
+    <aaa.version>0.1.0-SNAPSHOT</aaa.version>
     <config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
+    <config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
     <config.netty.configfile>00-netty.xml</config.netty.configfile>
     <config.mdsal.configfile>01-mdsal.xml</config.mdsal.configfile>
     <config.xsql.configfile>04-xsql.xml</config.xsql.configfile>
     <topologymanager.shell.version>1.0.0-SNAPSHOT</topologymanager.shell.version>
     <troubleshoot.web.version>0.4.2-SNAPSHOT</troubleshoot.web.version>
     <typesafe.config.version>1.2.0</typesafe.config.version>
-    <uncommons.maths.version>1.2.2</uncommons.maths.version>
+    <uncommons.maths.version>1.2.2a</uncommons.maths.version>
     <usermanager.implementation.version>0.4.2-SNAPSHOT</usermanager.implementation.version>
     <usermanager.northbound.version>0.0.2-SNAPSHOT</usermanager.northbound.version>
     <usermanager.version>0.4.2-SNAPSHOT</usermanager.version>
index 00495a3..e34a5d3 100644 (file)
   <artifactId>opendaylight-karaf-resources</artifactId>
   <description>Resources for opendaylight-karaf</description>
   <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy</id>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <!-- here the phase you need -->
+            <phase>generate-resources</phase>
+            <configuration>
+              <artifactItems>
+                  <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+                <artifactItem>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk15on</artifactId>
+                    <version>${bouncycastle.version}</version>
+                    <outputDirectory>target/classes/lib/ext</outputDirectory>
+                    <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>
index c2ac77a..8a2aa59 100644 (file)
@@ -37,6 +37,8 @@ netconf.tcp.client.port=8383
 netconf.ssh.address=0.0.0.0
 netconf.ssh.port=1830
 netconf.ssh.pk.path = ./configuration/RSA.pk
+# Set security provider to BouncyCastle
+org.apache.karaf.security.providers = org.bouncycastle.jce.provider.BouncyCastleProvider
 
 
 netconf.config.persister.active=1
@@ -92,6 +94,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index cdc5924..795f68c 100644 (file)
                   <outputDirectory>target/assembly/lib</outputDirectory>
                   <destFileName>karaf.branding-${branding.version}.jar</destFileName>
                 </artifactItem>
+                  <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+              <artifactItem>
+                  <groupId>org.bouncycastle</groupId>
+                  <artifactId>bcprov-jdk15on</artifactId>
+                  <version>${bouncycastle.version}</version>
+                  <outputDirectory>target/assembly/lib/ext</outputDirectory>
+                  <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+              </artifactItem>
               </artifactItems>
             </configuration>
           </execution>
index 7f9f56f..fcb452f 100644 (file)
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-restconf-broker</artifactId>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-remoterpc-connector</artifactId>
-        </dependency>
 
 
         <dependency>
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-distributed-datastore</artifactId>
-        </dependency>
         <dependency>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-clustering-config</artifactId>
index b2fc3cb..530e46e 100644 (file)
@@ -116,6 +116,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index 325005b..98c81c2 100644 (file)
@@ -97,9 +97,8 @@
         <configuration>
           <instructions>
             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
-            <Export-package></Export-package>
-            <Private-Package></Private-Package>
-            <Import-Package></Import-Package>
+            <Export-package>org.opendaylight.cluster.raft</Export-package>
+            <Import-Package>*</Import-Package>
           </instructions>
         </configuration>
       </plugin>
index cbd7ca2..c4ff108 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected Object createSnapshot() {
-        return state;
+    @Override protected void createSnapshot() {
+        ByteString bs = null;
+        try {
+            bs = fromObject(state);
+        } catch (Exception e) {
+            LOG.error("Exception in creating snapshot", e);
+        }
+        getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
-        state.putAll((HashMap) snapshot);
-        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+        try {
+            state.putAll((HashMap) toObject(snapshot));
+        } catch (Exception e) {
+           LOG.error("Exception in applying snapshot", e);
+        }
+        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+    }
+
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
+
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs.toByteArray());
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
     }
 
     @Override protected void onStateChanged() {
index d11377d..6192cad 100644 (file)
@@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     public long getSnapshotBatchCount() {
         return 50;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return 50;
+    }
 }
index fd6e192..978ea91 100644 (file)
@@ -109,6 +109,8 @@ public class TestDriver {
                 td.printState();
             } else if (command.startsWith("printNodes")) {
                 td.printNodes();
+            } else {
+                System.out.println("Invalid command:" + command);
             }
 
         }
index b5b034a..b436bce 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -16,12 +18,18 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    protected List<ReplicatedLogEntry> journal;
+    protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ByteString previousSnapshot;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
@@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
-    public Object getSnapshot() {
+    public ByteString getSnapshot() {
         return snapshot;
     }
 
@@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+
+        previousSnapshot = getSnapshot();
+        setSnapshot(snapshot);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal.clear();
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        previousSnapshot = null;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal.clear();
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+
+        snapshot = previousSnapshot;
+        previousSnapshot = null;
+
+    }
 }
index 4c6434a..ed6439d 100644 (file)
@@ -52,4 +52,9 @@ public interface ConfigParams {
      * @return int
      */
     public int getElectionTimeVariance();
+
+    /**
+     * The size (in bytes) of the snapshot chunk sent from Leader
+     */
+    public int getSnapshotChunkSize();
 }
index 6432fa4..75c237f 100644 (file)
@@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return SNAPSHOT_CHUNK_SIZE;
+    }
 }
index 988789b..8135d83 100644 (file)
@@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
+    private CaptureSnapshot captureSnapshot = null;
+
+    private volatile boolean hasSnapshotCaptureInitiated = false;
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
@@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
             context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
             LOG.debug("Applied snapshot to replicatedLog. " +
                 "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.size());
 
             // Apply the snapshot to the actors state
-            applySnapshot(snapshot.getState());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
@@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 applyState.getReplicatedLogEntry().getData());
 
         } else if(message instanceof ApplySnapshot ) {
-            applySnapshot(((ApplySnapshot) message).getSnapshot());
+            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+            LOG.debug("ApplySnapshot called on Follower Actor " +
+                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            //clears the followers log, sets the snapshot index to ensure adjusted-index works
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+            context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+            LOG.info("SaveSnapshotSuccess received for snapshot");
+
+            context.getReplicatedLog().snapshotCommit();
 
             // TODO: Not sure if we want to be this aggressive with trimming stuff
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
 
-            // TODO: Handle failure in saving the snapshot
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+                context.getReplicatedLog().getSnapshotIndex(),
+                context.getReplicatedLog().getSnapshotTerm(),
+                context.getReplicatedLog().size());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
 
+        } else if (message instanceof CaptureSnapshot) {
+            LOG.debug("CaptureSnapshot received by actor");
+            CaptureSnapshot cs = (CaptureSnapshot)message;
+            captureSnapshot = cs;
+            createSnapshot();
+
+        } else if (message instanceof CaptureSnapshotReply){
+            LOG.debug("CaptureSnapshotReply received by actor");
+            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+            ByteString stateInBytes = csr.getSnapshot();
+            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            handleCaptureSnapshotReply(stateInBytes);
+
         } else {
+            if (!(message instanceof AppendEntriesMessages.AppendEntries)
+                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+                LOG.debug("onReceiveCommand: message:" + message.getClass());
+            }
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -264,6 +314,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected ActorSelection getLeader(){
         String leaderAddress = getLeaderAddress();
 
+        if(leaderAddress == null){
+            return null;
+        }
+
         return context.actorSelection(leaderAddress);
     }
 
@@ -344,7 +398,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return The current state of the actor
      */
-    protected abstract Object createSnapshot();
+    protected abstract void createSnapshot();
 
     /**
      * This method will be called by the RaftActor during recovery to
@@ -356,7 +410,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(Object snapshot);
+    protected abstract void applySnapshot(ByteString snapshot);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -423,11 +477,39 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return peerAddress;
     }
 
+    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+        // create a snapshot object from the state provided and save it
+        // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+        saveSnapshot(sn);
+
+        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+        //be greedy and remove entries from in-mem journal which are in the snapshot
+        // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+            captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        captureSnapshot = null;
+        hasSnapshotCaptureInitiated = false;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getState(),
+            super(ByteString.copyFrom(snapshot.getState()),
                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
@@ -476,8 +558,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                        // when a snaphsot is being taken, captureSnapshot != null
+                        if (hasSnapshotCaptureInitiated == false &&
+                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -493,26 +577,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
                             LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
                             LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
 
-                            // create a snapshot object from the state provided and save it
-                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
-                            Snapshot sn = Snapshot.create(createSnapshot(),
-                                getFrom(context.getLastApplied() + 1),
-                                lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm);
-                            saveSnapshot(sn);
-
-                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
-                            //be greedy and remove entries from in-mem journal which are in the snapshot
-                            // and update snapshotIndex and snapshotTerm without waiting for the success,
-                            // TODO: damage-recovery to be done on failure
-                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
-                            snapshotIndex = lastAppliedIndex;
-                            snapshotTerm = lastAppliedTerm;
-
-                            LOG.info("Removed in-memory snapshotted entries, " +
-                                "adjusted snaphsotIndex:{}" +
-                                "and term:{}", snapshotIndex, lastAppliedTerm);
+                            // send a CaptureSnapshot to self to make the expensive operation async.
+                            getSelf().tell(new CaptureSnapshot(
+                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                                null);
+                            hasSnapshotCaptureInitiated = true;
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -542,65 +611,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class Snapshot implements Serializable {
-        private final Object state;
-        private final List<ReplicatedLogEntry> unAppliedEntries;
-        private final long lastIndex;
-        private final long lastTerm;
-        private final long lastAppliedIndex;
-        private final long lastAppliedTerm;
-
-        private Snapshot(Object state,
-            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
-            this.state = state;
-            this.unAppliedEntries = unAppliedEntries;
-            this.lastIndex = lastIndex;
-            this.lastTerm = lastTerm;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-
-
-        public static Snapshot create(Object state,
-            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm) {
-            return new Snapshot(state, entries, lastIndex, lastTerm,
-                lastAppliedIndex, lastAppliedTerm);
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public List<ReplicatedLogEntry> getUnAppliedEntries() {
-            return unAppliedEntries;
-        }
-
-        public long getLastTerm() {
-            return lastTerm;
-        }
-
-        public long getLastAppliedIndex() {
-            return lastAppliedIndex;
-        }
-
-        public long getLastAppliedTerm() {
-            return lastAppliedTerm;
-        }
-
-        public String getLogMessage() {
-            StringBuilder sb = new StringBuilder();
-            return sb.append("Snapshot={")
-                .append("lastTerm:" + this.getLastTerm()  + ", ")
-                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-                .toString();
-
-        }
-    }
-
     private class ElectionTermImpl implements ElectionTerm {
         /**
          * Identifier of the actor whose election term information this is
index e6e160b..c17f544 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.List;
 
 /**
@@ -118,7 +120,7 @@ public interface ReplicatedLog {
      *
      * @return an object representing the snapshot if it exists. null otherwise
      */
-    Object getSnapshot();
+    ByteString getSnapshot();
 
     /**
      * Get the index of the snapshot
@@ -134,4 +136,49 @@ public interface ReplicatedLog {
      * otherwise
      */
     long getSnapshotTerm();
+
+    /**
+     * sets the snapshot index in the replicated log
+     * @param snapshotIndex
+     */
+    void setSnapshotIndex(long snapshotIndex);
+
+    /**
+     * sets snapshot term
+     * @param snapshotTerm
+     */
+    public void setSnapshotTerm(long snapshotTerm);
+
+    /**
+     * sets the snapshot in bytes
+     * @param snapshot
+     */
+    public void setSnapshot(ByteString snapshot);
+
+    /**
+     * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+     * @param startIndex
+     * @param endIndex
+     */
+    public void clear(int startIndex, int endIndex);
+
+    /**
+     * Handles all the bookkeeping in order to perform a rollback in the
+     * event of SaveSnapshotFailure
+     * @param snapshot
+     * @param snapshotCapturedIndex
+     * @param snapshotCapturedTerm
+     */
+    public void snapshotPreCommit(ByteString snapshot,
+        long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+    /**
+     * Sets the Replicated log to state after snapshot success.
+     */
+    public void snapshotCommit();
+
+    /**
+     * Restores the replicated log to a state in the event of a save snapshot failure
+     */
+    public void snapshotRollback();
 }
index 374e0fa..2f5ba48 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 
 public class SerializationUtils {
 
     public static Object fromSerializable(Object serializable){
         if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
             return AppendEntries.fromSerializable(serializable);
+
+        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+            return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
new file mode 100644 (file)
index 0000000..8e0fcca
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+    private final byte[] state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+
+    private Snapshot(byte[] state,
+        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+
+    public static Snapshot create(byte[] state,
+        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm,
+            lastAppliedIndex, lastAppliedTerm);
+    }
+
+    public byte[] getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public String getLogMessage() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("Snapshot={")
+            .append("lastTerm:" + this.getLastTerm() + ", ")
+            .append("lastIndex:" + this.getLastIndex()  + ", ")
+            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+            .toString();
+
+    }
+}
index 9739fb2..c356804 100644 (file)
@@ -8,16 +8,21 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
 import java.io.Serializable;
 
+/**
+ * Internal message, issued by follower to its actor
+ */
 public class ApplySnapshot implements Serializable {
-    private final Object snapshot;
+    private final Snapshot snapshot;
 
-    public ApplySnapshot(Object snapshot) {
+    public ApplySnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public Object getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
new file mode 100644 (file)
index 0000000..bb86e1a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+    private long lastAppliedIndex;
+    private long lastAppliedTerm;
+    private long lastIndex;
+    private long lastTerm;
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return lastIndex;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
new file mode 100644 (file)
index 0000000..96150db
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+    private ByteString snapshot;
+
+    public CaptureSnapshotReply(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+}
index 251a13d..7e896fe 100644 (file)
@@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index a log index that is known to be committed
      */
     protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
@@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (replicatedLogEntry != null) {
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
                 context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().debug("Setting last applied to {}", index);
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 54e0494..610fdc9 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (ยง5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }
index 234f9db..90948ff 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    private final Map<String, FollowerLogInformation> followerToLog =
+    protected final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
@@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
-        InstallSnapshotReply reply = message;
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerToSnapshot followerToSnapshot =
+            mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    context.getLogger().debug("InstallSnapshotReply received, " +
+                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                        reply.getChunkIndex(), followerId,
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+
+                    FollowerLogInformation followerLogInformation =
+                        followerToLog.get(followerId);
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                        followerToLog.get(followerId).getNextIndex().get());
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                context.getLogger().info("InstallSnapshotReply received, " +
+                    "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex());
+                followerToSnapshot.markSendStatus(false);
+            }
 
-        followerLogInformation
-            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
-        followerLogInformation
-            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+        } else {
+            context.getLogger().error("ERROR!!" +
+                "FollowerId in InstallSnapshotReply not known to Leader" +
+                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+        }
     }
 
     private void replicate(Replicate replicate) {
@@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior {
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
         for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-                if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // FIXME : Sending one entry at a time
-                    entries =
-                        context.getReplicatedLog().getFrom(nextIndex, 1);
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    }
+
+                } else {
+
+                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex(followerNextIndex),
+                                prevLogTerm(followerNextIndex), entries,
+                                context.getCommitIndex()).toSerializable(),
+                            actor()
+                        );
+
+                    } else {
+                        // if the followers next index is not present in the leaders log, then snapshot should be sent
+                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+                            // if the follower is just not starting and leader's index
+                            // is more than followers index
+                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+                            actor().tell(new SendInstallSnapshot(), actor());
+                        } else {
+                            followerActor.tell(
+                                new AppendEntries(currentTerm(), context.getId(),
+                                    prevLogIndex(followerNextIndex),
+                                    prevLogTerm(followerNextIndex), entries,
+                                    context.getCommitIndex()).toSerializable(),
+                                actor()
+                            );
+                        }
+                    }
                 }
-
-                followerActor.tell(
-                    new AppendEntries(currentTerm(), context.getId(),
-                        prevLogIndex(nextIndex),
-                        prevLogTerm(nextIndex), entries,
-                        context.getCommitIndex()).toSerializable(),
-                    actor()
-                );
             }
         }
     }
@@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
-                if (!context.getReplicatedLog().isPresent(nextIndex) && context
-                    .getReplicatedLog().isInSnapshot(nextIndex)) {
-                    followerActor.tell(
-                        new InstallSnapshot(currentTerm(), context.getId(),
-                            context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().getSnapshotTerm(),
-                            context.getReplicatedLog().getSnapshot()
-                        ),
-                        actor()
-                    );
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
     }
 
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    getNextSnapshotChunk(followerId,
+                        context.getReplicatedLog().getSnapshot()),
+                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                ).toSerializable(),
+                actor()
+            );
+            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+        } catch (IOException e) {
+            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+        return nextChunk;
+    }
+
     private RaftState sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
@@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior {
         return context.getId();
     }
 
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+                size, totalChunks);
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            context.getLogger().debug("length={}, offset={},size={}",
+                snapshotLength, start, size);
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
 }
index 888854f..9d40fa3 100644 (file)
@@ -8,19 +8,29 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
 public class InstallSnapshot extends AbstractRaftRPC {
 
+    public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
     private final String leaderId;
     private final long lastIncludedIndex;
     private final long lastIncludedTerm;
-    private final Object data;
+    private final ByteString data;
+    private final int chunkIndex;
+    private final int totalChunks;
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
         this.lastIncludedTerm = lastIncludedTerm;
         this.data = data;
+        this.chunkIndex = chunkIndex;
+        this.totalChunks = totalChunks;
     }
 
     public String getLeaderId() {
@@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public Object getData() {
+    public ByteString getData() {
         return data;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public <T extends Object> Object toSerializable(){
+        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+            .setLeaderId(this.getLeaderId())
+            .setChunkIndex(this.getChunkIndex())
+            .setData(this.getData())
+            .setLastIncludedIndex(this.getLastIncludedIndex())
+            .setLastIncludedTerm(this.getLastIncludedTerm())
+            .setTotalChunks(this.getTotalChunks()).build();
+
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        InstallSnapshotMessages.InstallSnapshot from =
+            (InstallSnapshotMessages.InstallSnapshot) o;
+
+        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+            from.getLeaderId(), from.getLastIncludedIndex(),
+            from.getLastIncludedTerm(), from.getData(),
+            from.getChunkIndex(), from.getTotalChunks());
+
+        return installSnapshot;
+    }
 }
index 85b89b7..d293a47 100644 (file)
@@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
+    private final int chunkIndex;
+    private boolean success;
 
-    protected InstallSnapshotReply(long term, String followerId) {
+    public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+        boolean success) {
         super(term);
         this.followerId = followerId;
+        this.chunkIndex = chunkIndex;
+        this.success = success;
     }
 
     public String getFollowerId() {
         return followerId;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java
new file mode 100644 (file)
index 0000000..e801ae1
--- /dev/null
@@ -0,0 +1,1015 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+  private InstallSnapshotMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface InstallSnapshotOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 lastIncludedIndex = 3;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    boolean hasLastIncludedIndex();
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    long getLastIncludedIndex();
+
+    // optional int64 lastIncludedTerm = 4;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    boolean hasLastIncludedTerm();
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    long getLastIncludedTerm();
+
+    // optional bytes data = 5;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    boolean hasData();
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    com.google.protobuf.ByteString getData();
+
+    // optional int32 chunkIndex = 6;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    boolean hasChunkIndex();
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    int getChunkIndex();
+
+    // optional int32 totalChunks = 7;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    boolean hasTotalChunks();
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    int getTotalChunks();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+   */
+  public static final class InstallSnapshot extends
+      com.google.protobuf.GeneratedMessage
+      implements InstallSnapshotOrBuilder {
+    // Use InstallSnapshot.newBuilder() to construct.
+    private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final InstallSnapshot defaultInstance;
+    public static InstallSnapshot getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public InstallSnapshot getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private InstallSnapshot(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              lastIncludedIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              lastIncludedTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              data_ = input.readBytes();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              chunkIndex_ = input.readInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              totalChunks_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+        new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+      public InstallSnapshot parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new InstallSnapshot(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 term = 1;
+    public static final int TERM_FIELD_NUMBER = 1;
+    private long term_;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public boolean hasTerm() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public long getTerm() {
+      return term_;
+    }
+
+    // optional string leaderId = 2;
+    public static final int LEADERID_FIELD_NUMBER = 2;
+    private java.lang.Object leaderId_;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public boolean hasLeaderId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public java.lang.String getLeaderId() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          leaderId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLeaderIdBytes() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        leaderId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional int64 lastIncludedIndex = 3;
+    public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+    private long lastIncludedIndex_;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public boolean hasLastIncludedIndex() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public long getLastIncludedIndex() {
+      return lastIncludedIndex_;
+    }
+
+    // optional int64 lastIncludedTerm = 4;
+    public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+    private long lastIncludedTerm_;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public boolean hasLastIncludedTerm() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public long getLastIncludedTerm() {
+      return lastIncludedTerm_;
+    }
+
+    // optional bytes data = 5;
+    public static final int DATA_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString data_;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public boolean hasData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public com.google.protobuf.ByteString getData() {
+      return data_;
+    }
+
+    // optional int32 chunkIndex = 6;
+    public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+    private int chunkIndex_;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public boolean hasChunkIndex() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public int getChunkIndex() {
+      return chunkIndex_;
+    }
+
+    // optional int32 totalChunks = 7;
+    public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+    private int totalChunks_;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public boolean hasTotalChunks() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public int getTotalChunks() {
+      return totalChunks_;
+    }
+
+    private void initFields() {
+      term_ = 0L;
+      leaderId_ = "";
+      lastIncludedIndex_ = 0L;
+      lastIncludedTerm_ = 0L;
+      data_ = com.google.protobuf.ByteString.EMPTY;
+      chunkIndex_ = 0;
+      totalChunks_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt32(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, totalChunks_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, totalChunks_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+      }
+
+      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        term_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        leaderId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lastIncludedIndex_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedTerm_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        data_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        chunkIndex_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        totalChunks_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.term_ = term_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.leaderId_ = leaderId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.lastIncludedIndex_ = lastIncludedIndex_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastIncludedTerm_ = lastIncludedTerm_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.data_ = data_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.chunkIndex_ = chunkIndex_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.totalChunks_ = totalChunks_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+        if (other.hasTerm()) {
+          setTerm(other.getTerm());
+        }
+        if (other.hasLeaderId()) {
+          bitField0_ |= 0x00000002;
+          leaderId_ = other.leaderId_;
+          onChanged();
+        }
+        if (other.hasLastIncludedIndex()) {
+          setLastIncludedIndex(other.getLastIncludedIndex());
+        }
+        if (other.hasLastIncludedTerm()) {
+          setLastIncludedTerm(other.getLastIncludedTerm());
+        }
+        if (other.hasData()) {
+          setData(other.getData());
+        }
+        if (other.hasChunkIndex()) {
+          setChunkIndex(other.getChunkIndex());
+        }
+        if (other.hasTotalChunks()) {
+          setTotalChunks(other.getTotalChunks());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 term = 1;
+      private long term_ ;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public boolean hasTerm() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public long getTerm() {
+        return term_;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder setTerm(long value) {
+        bitField0_ |= 0x00000001;
+        term_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder clearTerm() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        term_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional string leaderId = 2;
+      private java.lang.Object leaderId_ = "";
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public boolean hasLeaderId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public java.lang.String getLeaderId() {
+        java.lang.Object ref = leaderId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          leaderId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLeaderIdBytes() {
+        java.lang.Object ref = leaderId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          leaderId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder clearLeaderId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        leaderId_ = getDefaultInstance().getLeaderId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedIndex = 3;
+      private long lastIncludedIndex_ ;
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public boolean hasLastIncludedIndex() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public long getLastIncludedIndex() {
+        return lastIncludedIndex_;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder setLastIncludedIndex(long value) {
+        bitField0_ |= 0x00000004;
+        lastIncludedIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder clearLastIncludedIndex() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedIndex_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedTerm = 4;
+      private long lastIncludedTerm_ ;
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public boolean hasLastIncludedTerm() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public long getLastIncludedTerm() {
+        return lastIncludedTerm_;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder setLastIncludedTerm(long value) {
+        bitField0_ |= 0x00000008;
+        lastIncludedTerm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder clearLastIncludedTerm() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        lastIncludedTerm_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes data = 5;
+      private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public boolean hasData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public com.google.protobuf.ByteString getData() {
+        return data_;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder setData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        data_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder clearData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        data_ = getDefaultInstance().getData();
+        onChanged();
+        return this;
+      }
+
+      // optional int32 chunkIndex = 6;
+      private int chunkIndex_ ;
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public boolean hasChunkIndex() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public int getChunkIndex() {
+        return chunkIndex_;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder setChunkIndex(int value) {
+        bitField0_ |= 0x00000020;
+        chunkIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder clearChunkIndex() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        chunkIndex_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 totalChunks = 7;
+      private int totalChunks_ ;
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public boolean hasTotalChunks() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public int getTotalChunks() {
+        return totalChunks_;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder setTotalChunks(int value) {
+        bitField0_ |= 0x00000040;
+        totalChunks_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder clearTotalChunks() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        totalChunks_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+    }
+
+    static {
+      defaultInstance = new InstallSnapshot(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+      "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+      "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+      "light.controller.cluster.raft.protobuff." +
+      "messagesB\027InstallSnapshotMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
new file mode 100644 (file)
index 0000000..14f821b
--- /dev/null
@@ -0,0 +1,15 @@
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+    optional int64 term = 1;
+    optional string leaderId = 2;
+    optional int64 lastIncludedIndex = 3;
+    optional int64 lastIncludedTerm = 4;
+    optional bytes data = 5;
+    optional int32 chunkIndex = 6;
+    optional int32 totalChunks = 7;
+}
index ea3c9e7..ca34a34 100644 (file)
@@ -14,17 +14,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
@@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap();
+    private ConfigParams configParams;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext {
             }
         };
 
+        configParams = new DefaultConfigParamsImpl();
+
         initReplicatedLog();
     }
 
@@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public ConfigParams getConfigParams() {
-        return new DefaultConfigParamsImpl();
+        return configParams;
     }
 
-    public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
-        @Override public ReplicatedLogEntry get(long index) {
-            if(index >= log.size() || index < 0){
-                return null;
-            }
-            return log.get((int) index);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if(log.size() == 0){
-                return null;
-            }
-            return log.get(log.size()-1);
-        }
-
-        @Override public long lastIndex() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return;
-            }
-
-            log.subList((int) index, log.size()).clear();
-            //log.remove((int) index);
-        }
-
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            log.add(replicatedLogEntry);
-        }
+    public void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
 
+    public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
 
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            for(int i=(int) index ; i < log.size() ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            int maxIndex = (int) index + max;
-            if(maxIndex > log.size()){
-                maxIndex = log.size();
-            }
-
-            for(int i=(int) index ; i < maxIndex ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-
-        }
-
-        @Override public long size() {
-            return log.size();
-        }
-
-        @Override public boolean isPresent(long index) {
-            if(index >= log.size() || index < 0){
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return false;
-        }
-
-        @Override public Object getSnapshot() {
-            return null;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return -1;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return -1;
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
     }
 
index ff0ffeb..12123db 100644 (file)
@@ -6,6 +6,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest {
             Object data) {
         }
 
-        @Override protected Object createSnapshot() {
+        @Override protected void createSnapshot() {
             throw new UnsupportedOperationException("createSnapshot");
         }
 
-        @Override protected void applySnapshot(Object snapshot) {
+        @Override protected void applySnapshot(ByteString snapshot) {
             throw new UnsupportedOperationException("applySnapshot");
         }
 
index c5a81aa..227d1ef 100644 (file)
@@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
                         new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
index 17c22a1..73c9f96 100644 (file)
@@ -1,24 +1,40 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
-
-
             };
         }};
     }
@@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
+            };
+        }};
+    }
+
+    @Test
+    public void testSendInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext(getRef());
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+                    RaftState raftState = leader.handleMessage(
+                        senderActor, new Replicate(null, "state-id", entry));
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                        @Override
+                        protected Boolean match(Object o) throws Exception {
+                            if (o instanceof SendInstallSnapshot) {
+                                return true;
+                            }
+                            return false;
+                        }
+                    }.get();
+
+                    boolean sendInstallSnapshotReceived = false;
+                    for (Boolean b: matches) {
+                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+                    }
+
+                    assertTrue(sendInstallSnapshotReceived);
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+
+                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // check if installsnapshot gets called with the correct values.
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                                    InstallSnapshot is = (InstallSnapshot)
+                                        SerializationUtils.fromSerializable(in);
+                                    if (is.getData() == null) {
+                                        return "InstallSnapshot data is null";
+                                    }
+                                    if (is.getLastIncludedIndex() != snapshotIndex) {
+                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                                    }
+                                    if (is.getLastIncludedTerm() != snapshotTerm) {
+                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                                    }
+                                    if (is.getTerm() == currentTerm) {
+                                        return is.getTerm() + "!=" + currentTerm;
+                                    }
+
+                                    return "match";
+
+                               } else {
+                                    return "message mismatch:" + in.getClass();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new LeaderTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    ByteString bs = toByteString(leadersSnapshot);
+                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                        leader.getFollowerToSnapshot().getNextChunk();
+                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                    }
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
 
+                    RaftState raftState = leader.handleMessage(senderActor,
+                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                            leader.getFollowerToSnapshot().getChunkIndex(), true));
 
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+                    assertEquals(leader.followerToLog.size(), 1);
+                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+                }
             };
         }};
     }
 
+    @Test
+    public void testFollowerToSnapshotLogic() {
+
+        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
+
+        MockLeader leader = new MockLeader(actorContext);
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        ByteString bs = toByteString(leadersSnapshot);
+        byte[] barray = bs.toByteArray();
+
+        leader.createFollowerToSnapshot("followerId", bs);
+        assertEquals(bs.size(), barray.length);
+
+        int chunkIndex=0;
+        for (int i=0; i < barray.length; i = i + 50) {
+            int j = i + 50;
+            chunkIndex++;
+
+            if (i + 50 > barray.length) {
+                j = barray.length;
+            }
+
+            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+            leader.getFollowerToSnapshot().markSendStatus(true);
+            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+        }
+
+        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+    }
+
+
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), leaderActor);
+        return createActorContext(leaderActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef actorRef) {
+        return new MockRaftActorContext("test", getSystem(), actorRef);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+    private static class LeaderTestKit extends JavaTestKit {
+
+        private LeaderTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+    }
+
+    class MockLeader extends Leader {
+
+        FollowerToSnapshot fts;
+
+        public MockLeader(RaftActorContext context){
+            super(context);
+        }
+
+        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(nextIndex),
+                    new AtomicLong(matchIndex));
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        public FollowerToSnapshot getFollowerToSnapshot() {
+            return fts;
+        }
+
+        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+            fts = new FollowerToSnapshot(bs);
+            mapFollowerToSnapshot.put(followerId, fts);
+
+        }
     }
 }
index b8980cd..a3619ec 100644 (file)
       </dependency>
   </dependencies>
   <build>
+
       <plugins>
           <plugin>
               <groupId>org.jacoco</groupId>
                   </execution>
               </executions>
           </plugin>
-      </plugins>
+          <plugin>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <extensions>true</extensions>
+            <configuration>
+            <instructions>
+                <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                <Export-Package>org.opendaylight.controller.cluster.*,org.opendaylight.common.actor,org.opendaylight.common.reporting,org.opendaylight.controller.protobuff.*,org.opendaylight.controller.xml.*</Export-Package>
+                <Import-Package>*</Import-Package>
+            </instructions>
+            </configuration>
+          </plugin>
+    </plugins>
   </build>
-
 </project>
index fcf1f45..c867fce 100644 (file)
@@ -926,6 +926,16 @@ public final class PersistentMessages {
      */
     org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.ModificationOrBuilder getModificationOrBuilder(
         int index);
+
+    // optional int64 timeStamp = 2;
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    boolean hasTimeStamp();
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    long getTimeStamp();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CompositeModification}
@@ -986,6 +996,11 @@ public final class PersistentMessages {
               modification_.add(input.readMessage(org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.Modification.PARSER, extensionRegistry));
               break;
             }
+            case 16: {
+              bitField0_ |= 0x00000001;
+              timeStamp_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1028,6 +1043,7 @@ public final class PersistentMessages {
       return PARSER;
     }
 
+    private int bitField0_;
     // repeated .org.opendaylight.controller.mdsal.Modification modification = 1;
     public static final int MODIFICATION_FIELD_NUMBER = 1;
     private java.util.List<org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.Modification> modification_;
@@ -1064,8 +1080,25 @@ public final class PersistentMessages {
       return modification_.get(index);
     }
 
+    // optional int64 timeStamp = 2;
+    public static final int TIMESTAMP_FIELD_NUMBER = 2;
+    private long timeStamp_;
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    public boolean hasTimeStamp() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    public long getTimeStamp() {
+      return timeStamp_;
+    }
+
     private void initFields() {
       modification_ = java.util.Collections.emptyList();
+      timeStamp_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1088,6 +1121,9 @@ public final class PersistentMessages {
       for (int i = 0; i < modification_.size(); i++) {
         output.writeMessage(1, modification_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(2, timeStamp_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1101,6 +1137,10 @@ public final class PersistentMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, modification_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, timeStamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1224,6 +1264,8 @@ public final class PersistentMessages {
         } else {
           modificationBuilder_.clear();
         }
+        timeStamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
 
@@ -1251,6 +1293,7 @@ public final class PersistentMessages {
       public org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification buildPartial() {
         org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification result = new org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (modificationBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             modification_ = java.util.Collections.unmodifiableList(modification_);
@@ -1260,6 +1303,11 @@ public final class PersistentMessages {
         } else {
           result.modification_ = modificationBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.timeStamp_ = timeStamp_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -1301,6 +1349,9 @@ public final class PersistentMessages {
             }
           }
         }
+        if (other.hasTimeStamp()) {
+          setTimeStamp(other.getTimeStamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1574,6 +1625,39 @@ public final class PersistentMessages {
         return modificationBuilder_;
       }
 
+      // optional int64 timeStamp = 2;
+      private long timeStamp_ ;
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public boolean hasTimeStamp() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public long getTimeStamp() {
+        return timeStamp_;
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public Builder setTimeStamp(long value) {
+        bitField0_ |= 0x00000002;
+        timeStamp_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public Builder clearTimeStamp() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        timeStamp_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CompositeModification)
     }
 
@@ -1610,11 +1694,12 @@ public final class PersistentMessages {
       "e\030\001 \002(\t\022C\n\004path\030\002 \002(\01325.org.opendaylight" +
       ".controller.mdsal.InstanceIdentifier\0225\n\004" +
       "data\030\003 \001(\0132\'.org.opendaylight.controller" +
-      ".mdsal.Node\"^\n\025CompositeModification\022E\n\014" +
+      ".mdsal.Node\"q\n\025CompositeModification\022E\n\014" +
       "modification\030\001 \003(\0132/.org.opendaylight.co" +
-      "ntroller.mdsal.ModificationBO\n9org.opend" +
-      "aylight.controller.protobuff.messages.pe",
-      "rsistentB\022PersistentMessages"
+      "ntroller.mdsal.Modification\022\021\n\ttimeStamp" +
+      "\030\002 \001(\003BO\n9org.opendaylight.controller.pr",
+      "otobuff.messages.persistentB\022PersistentM" +
+      "essages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1632,7 +1717,7 @@ public final class PersistentMessages {
           internal_static_org_opendaylight_controller_mdsal_CompositeModification_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CompositeModification_descriptor,
-              new java.lang.String[] { "Modification", });
+              new java.lang.String[] { "Modification", "TimeStamp", });
           return null;
         }
       };
index 5848561..ea8f4a3 100644 (file)
@@ -75,9 +75,13 @@ public class XmlUtils {
    */
   public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting input composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -108,9 +112,13 @@ public class XmlUtils {
    */
   public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting output composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -150,7 +158,9 @@ public class XmlUtils {
   }
 
   public static CompositeNode xmlToCompositeNode(String xml){
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
     Node<?> dataTree;
     try {
@@ -179,11 +189,17 @@ public class XmlUtils {
    */
   public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml,  SchemaContext schemaContext){
     LOG.debug("Converting input xml to composite node {}", xml);
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
-    if(rpc == null) return null;
+    if(rpc == null) {
+        return null;
+    }
 
-    if(schemaContext == null) return null;
+    if(schemaContext == null) {
+        return null;
+    }
 
     CompositeNode compositeNode = null;
     try {
@@ -213,7 +229,7 @@ public class XmlUtils {
           LOG.debug("Converted xml input to list of nodes  {}", dataNodes);
 
           final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-          it.setQName(input);
+          it.setQName(rpc);
           it.add(ImmutableCompositeNode.create(input, dataNodes));
           compositeNode = it.toInstance();
           break;
index 8e83449..72651ab 100644 (file)
@@ -11,10 +11,12 @@ message Modification {
     required string type=1;
     required InstanceIdentifier path=2;
     optional Node data=3;
+
 }
 
 
 message CompositeModification {
     repeated Modification modification=1;
+    optional int64 timeStamp = 2;
 }
 
index 31b658d..91c0b5c 100644 (file)
                   <type>xml</type>
                   <classifier>config</classifier>
                 </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/akka.conf</file>
+                  <type>xml</type>
+                  <classifier>akkaconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/module-shards.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleshardconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/modules.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleconf</classifier>
+                </artifact>
               </artifacts>
             </configuration>
           </execution>
index 0532213..5a2116b 100644 (file)
@@ -21,7 +21,7 @@ odl-cluster-data {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2550
         maximum-frame-size = 419430400
         send-buffer-size = 52428800
@@ -30,9 +30,14 @@ odl-cluster-data {
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-data@<CHANGE_SEED_IP>:2550"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
       auto-down-unreachable-after = 10s
+
+      roles = [
+        "member-1"
+      ]
+
     }
   }
 }
@@ -51,13 +56,13 @@ odl-cluster-rpc {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2551
       }
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@<CHANGE_SEED_IP>:2551"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
 
       auto-down-unreachable-after = 10s
     }
index dd5a7f2..8299822 100644 (file)
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+    </dependency>
+
     <!-- SAL Dependencies -->
 
     <dependency>
             <Export-package></Export-package>
             <Private-Package></Private-Package>
             <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
+            <!--
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
                 *scala*;
             </Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
+            -->
           </instructions>
         </configuration>
       </plugin>
index 15c0548..b326d61 100644 (file)
@@ -10,24 +10,55 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import com.google.common.base.Function;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.osgi.framework.BundleContext;
 
-import javax.annotation.Nullable;
+import java.io.File;
 
 public class ActorSystemFactory {
-    private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
-
-        @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
-                ActorSystem system =
-                    ActorSystem.create("opendaylight-cluster-data", ConfigFactory
-                        .load().getConfig("odl-cluster-data"));
-                system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-                return system;
-        }
-    }).apply(null);
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    public static final String CONFIGURATION_NAME = "odl-cluster-data";
+
+    private static volatile ActorSystem actorSystem = null;
 
     public static final ActorSystem getInstance(){
         return actorSystem;
     }
+
+    /**
+     * This method should be called only once during initialization
+     *
+     * @param bundleContext
+     */
+    public static final ActorSystem createInstance(final BundleContext bundleContext) {
+        if(actorSystem == null) {
+            // Create an OSGi bundle classloader for actor system
+            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+            synchronized (ActorSystemFactory.class) {
+                // Double check
+
+                if (actorSystem == null) {
+                    ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                        ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+                    system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+                    actorSystem = system;
+                }
+            }
+        }
+
+        return actorSystem;
+    }
+
+
+    private static final Config readAkkaConfiguration(){
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+    }
 }
index af8a987..1021dde 100644 (file)
@@ -9,13 +9,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * Contains contextual data for shards.
+ * Contains contextual data for a data store.
  *
  * @author Thomas Pantelis
  */
@@ -23,16 +25,24 @@ public class DatastoreContext {
 
     private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private final Duration shardTransactionIdleTimeout;
+    private final int operationTimeoutInSeconds;
+    private final String dataStoreMXBeanType;
 
     public DatastoreContext() {
         this.dataStoreProperties = null;
+        this.dataStoreMXBeanType = "DistributedDatastore";
         this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+        this.operationTimeoutInSeconds = 5;
     }
 
-    public DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-        Duration shardTransactionIdleTimeout) {
+    public DatastoreContext(String dataStoreMXBeanType,
+            InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            Duration shardTransactionIdleTimeout,
+            int operationTimeoutInSeconds) {
+        this.dataStoreMXBeanType = dataStoreMXBeanType;
         this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
-        this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -43,5 +53,11 @@ public class DatastoreContext {
         return shardTransactionIdleTimeout;
     }
 
+    public String getDataStoreMXBeanType() {
+        return dataStoreMXBeanType;
+    }
 
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index 0a137e0..db01d51 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
@@ -22,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -36,8 +33,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.Duration;
-
 /**
  *
  */
@@ -46,42 +41,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-    private final DatastoreContext datastoreContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
+            Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
-
+        Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
-                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
-                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
-                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
-                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
-                        TimeUnit.MINUTES));
+        actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
+                ShardManager.props(type, cluster, configuration, datastoreContext)
+                    .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
 
-        actorContext
-                = new ActorContext(
-                    actorSystem, actorSystem.actorOf(
-                        ShardManager.props(type, cluster, configuration, datastoreContext).
-                            withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
-        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
+        actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.datastoreContext = new DatastoreContext();
     }
 
-
     @SuppressWarnings("unchecked")
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
index 65a39a6..8739ed1 100644 (file)
@@ -12,16 +12,17 @@ import akka.actor.ActorSystem;
 
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.osgi.framework.BundleContext;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DistributedDataStoreProperties dataStoreProperties) {
+            DatastoreContext datastoreContext, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = ActorSystemFactory.getInstance();
+        ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
             new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
-                    config, dataStoreProperties );
+                    config, datastoreContext );
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
         return dataStore;
index df3245f..e69de29 100644 (file)
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Wrapper class for DistributedDataStore configuration properties.
- *
- * @author Thomas Pantelis
- */
-public class DistributedDataStoreProperties {
-    private final int maxShardDataChangeListenerQueueSize;
-    private final int maxShardDataChangeExecutorQueueSize;
-    private final int maxShardDataChangeExecutorPoolSize;
-    private final int shardTransactionIdleTimeoutInMinutes;
-    private final int operationTimeoutInSeconds;
-
-    public DistributedDataStoreProperties() {
-        maxShardDataChangeListenerQueueSize = 1000;
-        maxShardDataChangeExecutorQueueSize = 1000;
-        maxShardDataChangeExecutorPoolSize = 20;
-        shardTransactionIdleTimeoutInMinutes = 10;
-        operationTimeoutInSeconds = 5;
-    }
-
-    public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
-            int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
-        this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
-        this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
-        this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
-        this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
-        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
-    }
-
-    public int getMaxShardDataChangeListenerQueueSize() {
-        return maxShardDataChangeListenerQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorQueueSize() {
-        return maxShardDataChangeExecutorQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorPoolSize() {
-        return maxShardDataChangeExecutorPoolSize;
-    }
-
-    public int getShardTransactionIdleTimeoutInMinutes() {
-        return shardTransactionIdleTimeoutInMinutes;
-    }
-
-    public int getOperationTimeoutInSeconds() {
-        return operationTimeoutInSeconds;
-    }
-}
index 43a9faa..7d57004 100644 (file)
@@ -10,18 +10,22 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -34,30 +38,35 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionR
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,12 +110,15 @@ public class Shard extends RaftActor {
 
     private SchemaContext schemaContext;
 
+    private ActorRef createSnapshotTransaction;
+
     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
+        this.schemaContext = schemaContext;
 
         String setting = System.getProperty("shard.persistent");
 
@@ -117,8 +129,14 @@ public class Shard extends RaftActor {
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
+        if(schemaContext != null) {
+            store.onGlobalContextUpdated(schemaContext);
+        }
 
+        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+                datastoreContext.getDataStoreMXBeanType());
+        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
     }
 
@@ -136,16 +154,28 @@ public class Shard extends RaftActor {
 
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        DatastoreContext datastoreContext) {
+        DatastoreContext datastoreContext, SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
-        Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
+        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
 
-        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
+        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
+    }
+
+    @Override public void onReceiveRecover(Object message) {
+        LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+            getSender());
+
+        if (message instanceof RecoveryFailure){
+            LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+        } else {
+            super.onReceiveRecover(message);
+        }
     }
 
     @Override public void onReceiveCommand(Object message) {
-        LOG.debug("Received message {} from {}", message.getClass().toString(),
+        LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
             getSender());
 
         if (message.getClass()
@@ -155,6 +185,15 @@ public class Shard extends RaftActor {
             } else if (getLeader() != null) {
                 getLeader().forward(message, getContext());
             }
+        } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+            // This must be for install snapshot. Don't want to open this up and trigger
+            // deSerialization
+            self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
+
+            // Send a PoisonPill instead of sending close transaction because we do not really need
+            // a response
+            getSender().tell(PoisonPill.getInstance(), self());
+
         } else if (message instanceof RegisterChangeListener) {
             registerChangeListener((RegisterChangeListener) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -178,59 +217,79 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(
-        CreateTransaction createTransaction,
+        int transactionType,
         ShardTransactionIdentifier transactionId) {
-        if (createTransaction.getTransactionType()
+
+        if(this.schemaContext == null){
+            throw new NullPointerException("schemaContext should not be null");
+        }
+
+        if (transactionType
             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext,datastoreContext, shardMBean), transactionId.toString());
 
-        } else if (createTransaction.getTransactionType()
+        } else if (transactionType
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext, datastoreContext,name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
 
 
-        } else if (createTransaction.getTransactionType()
+        } else if (transactionType
             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext, datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
-                    + createTransaction.getTransactionType());
+                    + transactionType);
         }
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
+        createTransaction(createTransaction.getTransactionType(),
+            createTransaction.getTransactionId());
+    }
+
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
-                .remoteTransactionId(createTransaction.getTransactionId())
+                .remoteTransactionId(remoteTransactionId)
                 .build();
         LOG.debug("Creating transaction : {} ", transactionId);
         ActorRef transactionActor =
-            createTypedTransactionActor(createTransaction, transactionId);
+            createTypedTransactionActor(transactionType, transactionId);
 
         getSender()
             .tell(new CreateTransactionReply(
                     Serialization.serializedActorPath(transactionActor),
-                    createTransaction.getTransactionId()).toSerializable(),
+                    remoteTransactionId).toSerializable(),
                 getSelf());
+
+        return transactionActor;
+    }
+
+    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+        throws ExecutionException, InterruptedException {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
     }
 
+
     private void commit(final ActorRef sender, Object serialized) {
         Modification modification = MutableCompositeModification
             .fromSerializable(serialized, schemaContext);
@@ -240,16 +299,11 @@ public class Shard extends RaftActor {
             LOG.debug(
                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
                 modification);
-            DOMStoreReadWriteTransaction transaction =
-                store.newReadWriteTransaction();
+            DOMStoreWriteTransaction transaction =
+                store.newWriteOnlyTransaction();
             modification.apply(transaction);
-            DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-            ListenableFuture<Void> future =
-                commitCohort.preCommit();
             try {
-                future.get();
-                future = commitCohort.commit();
-                future.get();
+                syncCommitTransaction(transaction);
             } catch (InterruptedException | ExecutionException e) {
                 shardMBean.incrementFailedTransactionsCount();
                 LOG.error("Failed to commit", e);
@@ -266,9 +320,9 @@ public class Shard extends RaftActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-               sender.tell(new CommitTransactionReply().toSerializable(),self);
-               shardMBean.incrementCommittedTransactionCount();
-               shardMBean.setLastCommittedTransactionTime(new Date());
+                sender.tell(new CommitTransactionReply().toSerializable(), self);
+                shardMBean.incrementCommittedTransactionCount();
+                shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
             }
 
             @Override
@@ -298,9 +352,14 @@ public class Shard extends RaftActor {
 
     private void updateSchemaContext(UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
+        updateSchemaContext(message.getSchemaContext());
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
+    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+        store.onGlobalContextUpdated(schemaContext);
+    }
+
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
@@ -345,9 +404,9 @@ public class Shard extends RaftActor {
     private void createTransactionChain() {
         DOMStoreTransactionChain chain = store.createTransactionChain();
         ActorRef transactionChain = getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+                ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-                getSelf());
+            getSelf());
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
@@ -365,7 +424,6 @@ public class Shard extends RaftActor {
                     identifier, clientActor.path().toString());
             }
 
-
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -383,12 +441,40 @@ public class Shard extends RaftActor {
 
     }
 
-    @Override protected Object createSnapshot() {
-        throw new UnsupportedOperationException("createSnapshot");
+    @Override protected void createSnapshot() {
+        if (createSnapshotTransaction == null) {
+
+            // Create a transaction. We are really going to treat the transaction as a worker
+            // so that this actor does not get block building the snapshot
+            createSnapshotTransaction = createTransaction(
+                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
+                "createSnapshot");
+
+            createSnapshotTransaction.tell(
+                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
+
+        }
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
-        throw new UnsupportedOperationException("applySnapshot");
+    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+        try {
+            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
+                .decode(YangInstanceIdentifier.builder().build(), serializedNode);
+
+            // delete everything first
+            transaction.delete(YangInstanceIdentifier.builder().build());
+
+            // Add everything from the remote node back
+            transaction.write(YangInstanceIdentifier.builder().build(), node);
+            syncCommitTransaction(transaction);
+        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+            LOG.error(e, "An exception occurred when applying snapshot");
+        }
     }
 
     @Override protected void onStateChanged() {
@@ -426,17 +512,42 @@ public class Shard extends RaftActor {
         final ShardIdentifier name;
         final Map<ShardIdentifier, String> peerAddresses;
         final DatastoreContext datastoreContext;
+        final SchemaContext schemaContext;
 
         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-                DatastoreContext datastoreContext) {
+                DatastoreContext datastoreContext, SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
             this.datastoreContext = datastoreContext;
+            this.schemaContext = schemaContext;
         }
 
         @Override
         public Shard create() throws Exception {
-            return new Shard(name, peerAddresses, datastoreContext);
+            return new Shard(name, peerAddresses, datastoreContext, schemaContext);
         }
     }
+
+    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+            transaction.read(YangInstanceIdentifier.builder().build());
+
+        NormalizedNode<?, ?> node = future.get().get();
+
+        transaction.close();
+
+        return node;
+    }
+
+    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+        throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+        transaction.write(id, node);
+
+        syncCommitTransaction(transaction);
+    }
+
 }
index e51d49b..58cdefe 100644 (file)
@@ -17,9 +17,7 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-
 import com.google.common.base.Preconditions;
-
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -32,8 +30,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayList;
@@ -89,9 +87,7 @@ public class ShardManager extends AbstractUntypedActor {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        // Create all the local Shards and make them a child of the ShardManager
-        // TODO: This may need to be initiated when we first get the schema context
-        createLocalShards();
+        //createLocalShards(null);
     }
 
     public static Props props(final String type,
@@ -162,8 +158,14 @@ public class ShardManager extends AbstractUntypedActor {
      * @param message
      */
     private void updateSchemaContext(Object message) {
-        for(ShardInformation info : localShards.values()){
-            info.getActor().tell(message,getSelf());
+        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        if(localShards.size() == 0){
+            createLocalShards(schemaContext);
+        } else {
+            for (ShardInformation info : localShards.values()) {
+                info.getActor().tell(message, getSelf());
+            }
         }
     }
 
@@ -235,7 +237,7 @@ public class ShardManager extends AbstractUntypedActor {
      * runs
      *
      */
-    private void createLocalShards() {
+    private void createLocalShards(SchemaContext schemaContext) {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -245,16 +247,14 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
                     withMailbox(ActorContext.MAILBOX), shardId.toString());
-
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
-        mBean = ShardManagerInfo
-            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
index 91d6294..0e9fd11 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -26,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index bd71c27..d04ec23 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -30,8 +31,8 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 3b0e093..65f865b 100644 (file)
@@ -13,10 +13,12 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -73,22 +75,21 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
     protected final SchemaContext schemaContext;
-    private final String  shardName;
-
+    private final ShardStats shardStats;
 
     private final MutableCompositeModification modification = new MutableCompositeModification();
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-        String shardName) {
+            ShardStats shardStats) {
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) {
+            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardName));
+           datastoreContext, shardStats));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -137,7 +138,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
                         sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
                     }
                 } catch (Exception e) {
-                    ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount();
+                    shardStats.incrementFailedReadTransactionsCount();
                     sender.tell(new akka.actor.Status.Failure(e), self);
                 }
 
@@ -196,7 +197,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort");
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
         getSender()
         .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
@@ -210,13 +211,14 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
         final ActorRef shardActor;
         final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
-        final String shardName;
+        final ShardStats shardStats;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) {
+                SchemaContext schemaContext, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.transaction = transaction;
             this.shardActor = shardActor;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
         }
@@ -226,13 +228,13 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardName);
+                        schemaContext, shardStats);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index e7a1818..484bd54 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -27,14 +28,14 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
     private final SchemaContext schemaContext;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext,String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     @Override
@@ -60,17 +61,17 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
@@ -87,8 +88,9 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     }
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-        DatastoreContext datastoreContext, String shardName) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext, datastoreContext, shardName));
+        DatastoreContext datastoreContext, ShardStats shardStats) {
+        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
+                datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -97,20 +99,20 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
-        final String shardName;
+        final ShardStats shardStats;
 
 
         ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
             this.schemaContext = schemaContext;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext,shardName);
+            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
         }
     }
 }
index 41c46c3..396b27a 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -28,8 +29,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 5a6d0ec..2dce6a1 100644 (file)
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -35,25 +35,25 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
     private final CompositeModification modification;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
-        ActorRef shardActor, CompositeModification modification,String shardName) {
+        ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
 
         this.cohort = cohort;
         this.shardActor = shardActor;
         this.modification = modification;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     private final LoggingAdapter log =
         Logging.getLogger(getContext().system(), this);
 
     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
-        final ActorRef shardActor, final CompositeModification modification,
-        String shardName) {
+            final ActorRef shardActor, final CompositeModification modification,
+            ShardStats shardStats) {
         return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
-           shardName));
+                shardStats));
     }
 
     @Override
@@ -83,7 +83,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-                ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
+                shardStats.incrementAbortTransactionsCount();
                 sender
                     .tell(new AbortTransactionReply().toSerializable(),
                     self);
@@ -154,19 +154,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final DOMStoreThreePhaseCommitCohort cohort;
         final ActorRef shardActor;
         final CompositeModification modification;
-        final String shardName;
+        final ShardStats shardStats;
 
         ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
-            ActorRef shardActor, CompositeModification modification, String shardName) {
+            ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
             this.cohort = cohort;
             this.shardActor = shardActor;
             this.modification = modification;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ThreePhaseCommitCohort create() throws Exception {
-            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);
+            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
         }
     }
 }
index 3c46935..e69de29 100644 (file)
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
-
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-
-/**
- * All MBeans should extend this class that help in registering and
- * unregistering the MBeans.
- * @author Basheeruddin <syedbahm@cisco.com>
- */
-
-
-public abstract class AbstractBaseMBean {
-
-
-  public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
-  public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
-  public static String JMX_CATEGORY_SHARD = "Shard";
-  public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(AbstractBaseMBean.class);
-
-  MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-  /**
-   * gets the MBean ObjectName
-   *
-   * @return Object name of the MBean
-   * @throws MalformedObjectNameException - The bean name does not have the right format.
-   * @throws NullPointerException - The bean name is null
-   */
-  protected ObjectName getMBeanObjectName()
-      throws MalformedObjectNameException, NullPointerException {
-    String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
-                                   getMBeanCategory() + ",name="+
-                                   getMBeanName();
-
-
-    return new ObjectName(name);
-  }
-
-  public boolean registerMBean() {
-    boolean registered = false;
-    try {
-      // Object to identify MBean
-      final ObjectName mbeanName = this.getMBeanObjectName();
-
-      Preconditions.checkArgument(mbeanName != null,
-          "Object name of the MBean cannot be null");
-
-      LOG.debug("Register MBean {}", mbeanName);
-
-      // unregistered if already registered
-      if (server.isRegistered(mbeanName)) {
-
-        LOG.debug("MBean {} found to be already registered", mbeanName);
-
-        try {
-          unregisterMBean(mbeanName);
-        } catch (Exception e) {
-
-          LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
-              e);
-        }
-      }
-      server.registerMBean(this, mbeanName);
-
-      LOG.debug("MBean {} registered successfully",
-          mbeanName.getCanonicalName());
-      registered = true;
-    } catch (Exception e) {
-
-      LOG.error("registration failed {}", e);
-
-    }
-    return registered;
-  }
-
-
-  public boolean unregisterMBean() {
-    boolean unregister = false;
-    try {
-      ObjectName mbeanName = this.getMBeanObjectName();
-      unregister = true;
-      unregisterMBean(mbeanName);
-    } catch (Exception e) {
-
-      LOG.error("Failed when unregistering MBean {}", e);
-    }
-    return unregister;
-  }
-
-  private void unregisterMBean(ObjectName mbeanName)
-      throws MBeanRegistrationException, InstanceNotFoundException {
-
-    server.unregisterMBean(mbeanName);
-
-  }
-
-
-  /**
-   * @return name of bean
-   */
-  protected abstract String getMBeanName();
-
-  /**
-   * @return type of the MBean
-   */
-  protected abstract String getMBeanType();
-
-
-  /**
-   * @return Category name of teh bean
-   */
-  protected abstract String getMBeanCategory();
-
-  //require for test cases
-  public MBeanServer getMBeanServer() {
-    return server;
-  }
-}
index 2a409c0..946e525 100644 (file)
@@ -7,28 +7,41 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 /**
  * @author Basheeruddin syedbahm@cisco.com
  *
  */
 public class ShardMBeanFactory {
-    private static Map<String, ShardStats> shardMBeans =
-        new HashMap<String, ShardStats>();
 
-    public static ShardStats getShardStatsMBean(String shardName) {
-        if (shardMBeans.containsKey(shardName)) {
-            return shardMBeans.get(shardName);
-        } else {
-            ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+    private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
 
-            if (shardStatsMBeanImpl.registerMBean()) {
-                shardMBeans.put(shardName, shardStatsMBeanImpl);
-            }
-            return shardStatsMBeanImpl;
+    private static Cache<String,ShardStats> shardMBeansCache =
+                                      CacheBuilder.newBuilder().weakValues().build();
+
+    public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
+        final String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
+        try {
+            return shardMBeansCache.get(shardName, new Callable<ShardStats>() {
+                @Override
+                public ShardStats call() throws Exception {
+                    ShardStats shardStatsMBeanImpl = new ShardStats(shardName, finalMXBeanType);
+                    shardStatsMBeanImpl.registerMBean();
+                    return shardStatsMBeanImpl;
+                }
+            });
+        } catch(ExecutionException e) {
+            LOG.error(String.format("Could not create MXBean for shard: %s", shardName), e);
+            // Just return an instance that isn't registered.
+            return new ShardStats(shardName, finalMXBeanType);
         }
     }
-
 }
index 22ad8e7..0a1964b 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
 /**
+ * Maintains statistics for a shard.
+ *
  * @author  Basheeruddin syedbahm@cisco.com
  */
-public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
+    public static String JMX_CATEGORY_SHARD = "Shards";
 
-    private final String shardName;
+    private final AtomicLong committedTransactionsCount = new AtomicLong();
 
-    private long committedTransactionsCount = 0L;
+    private final AtomicLong readOnlyTransactionCount = new AtomicLong();
 
-    private long readOnlyTransactionCount = 0L;
+    private final AtomicLong writeOnlyTransactionCount = new AtomicLong();
 
-    private long writeOnlyTransactionCount = 0L;
-
-    private long readWriteTransactionCount = 0L;
+    private final AtomicLong readWriteTransactionCount = new AtomicLong();
 
     private String leader;
 
     private String raftState;
 
-    private long lastLogTerm = -1L;
+    private volatile long lastLogTerm = -1L;
+
+    private volatile long lastLogIndex = -1L;
 
-    private long lastLogIndex = -1L;
+    private volatile long currentTerm = -1L;
 
-    private long currentTerm = -1L;
+    private volatile long commitIndex = -1L;
 
-    private long commitIndex = -1L;
+    private volatile long lastApplied = -1L;
 
-    private long lastApplied = -1L;
+    private volatile long lastCommittedTransactionTime;
 
-    private Date lastCommittedTransactionTime = new Date(0L);
+    private final AtomicLong failedTransactionsCount = new AtomicLong();
 
-    private long failedTransactionsCount = 0L;
+    private final AtomicLong failedReadTransactionsCount = new AtomicLong();
 
-    private long failedReadTransactionsCount = 0L;
+    private final AtomicLong abortTransactionsCount = new AtomicLong();
 
-    private long abortTransactionsCount = 0L;
+    private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
 
-    private SimpleDateFormat sdf =
+    private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+
+    private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+    private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
-    ShardStats(String shardName) {
-        this.shardName = shardName;
+    public ShardStats(String shardName, String mxBeanType) {
+        super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+    }
+
+    public void setDataStoreExecutor(ExecutorService dsExecutor) {
+        this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+                "notification-executor", getMBeanType(), getMBeanCategory());
     }
 
+    public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+        this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+                "notification-manager", getMBeanType(), getMBeanCategory());
+
+        this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+                "data-store-executor", getMBeanType(), getMBeanCategory());
+    }
 
     @Override
     public String getShardName() {
-        return shardName;
+        return getMBeanName();
     }
 
     @Override
     public long getCommittedTransactionsCount() {
-        return committedTransactionsCount;
+        return committedTransactionsCount.get();
     }
 
-    @Override public String getLeader() {
+    @Override
+    public String getLeader() {
         return leader;
     }
 
-    @Override public String getRaftState() {
+    @Override
+    public String getRaftState() {
         return raftState;
     }
 
-    @Override public long getReadOnlyTransactionCount() {
-        return readOnlyTransactionCount;
+    @Override
+    public long getReadOnlyTransactionCount() {
+        return readOnlyTransactionCount.get();
     }
 
-    @Override public long getWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount;
+    @Override
+    public long getWriteOnlyTransactionCount() {
+        return writeOnlyTransactionCount.get();
     }
 
-    @Override public long getReadWriteTransactionCount() {
-        return readWriteTransactionCount;
+    @Override
+    public long getReadWriteTransactionCount() {
+        return readWriteTransactionCount.get();
     }
 
-    @Override public long getLastLogIndex() {
+    @Override
+    public long getLastLogIndex() {
         return lastLogIndex;
     }
 
-    @Override public long getLastLogTerm() {
+    @Override
+    public long getLastLogTerm() {
         return lastLogTerm;
     }
 
-    @Override public long getCurrentTerm() {
+    @Override
+    public long getCurrentTerm() {
         return currentTerm;
     }
 
-    @Override public long getCommitIndex() {
+    @Override
+    public long getCommitIndex() {
         return commitIndex;
     }
 
-    @Override public long getLastApplied() {
+    @Override
+    public long getLastApplied() {
         return lastApplied;
     }
 
     @Override
     public String getLastCommittedTransactionTime() {
 
-        return sdf.format(lastCommittedTransactionTime);
+        return sdf.format(new Date(lastCommittedTransactionTime));
     }
 
-    @Override public long getFailedTransactionsCount() {
-        return failedTransactionsCount;
+    @Override
+    public long getFailedTransactionsCount() {
+        return failedTransactionsCount.get();
     }
 
-    @Override public long getFailedReadTransactionsCount() {
-        return failedReadTransactionsCount;
+    @Override
+    public long getFailedReadTransactionsCount() {
+        return failedReadTransactionsCount.get();
     }
 
-    @Override public long getAbortTransactionsCount() {
-        return abortTransactionsCount;
+    @Override
+    public long getAbortTransactionsCount() {
+        return abortTransactionsCount.get();
     }
 
     public long incrementCommittedTransactionCount() {
-        return committedTransactionsCount++;
+        return committedTransactionsCount.incrementAndGet();
     }
 
     public long incrementReadOnlyTransactionCount() {
-        return readOnlyTransactionCount++;
+        return readOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount++;
+        return writeOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementReadWriteTransactionCount() {
-        return readWriteTransactionCount++;
+        return readWriteTransactionCount.incrementAndGet();
     }
 
     public long incrementFailedTransactionsCount() {
-        return failedTransactionsCount++;
+        return failedTransactionsCount.incrementAndGet();
     }
 
     public long incrementFailedReadTransactionsCount() {
-        return failedReadTransactionsCount++;
+        return failedReadTransactionsCount.incrementAndGet();
     }
 
-    public long incrementAbortTransactionsCount () { return abortTransactionsCount++;}
+    public long incrementAbortTransactionsCount ()
+    {
+        return abortTransactionsCount.incrementAndGet();
+    }
 
     public void setLeader(String leader) {
         this.leader = leader;
@@ -180,49 +224,50 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
         this.lastApplied = lastApplied;
     }
 
-
-    public void setLastCommittedTransactionTime(
-        Date lastCommittedTransactionTime) {
+    public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
     @Override
-    protected String getMBeanName() {
-        return shardName;
+    public ThreadExecutorStats getDataStoreExecutorStats() {
+        return dataStoreExecutorStatsBean.toThreadExecutorStats();
+    }
+
+    @Override
+    public ThreadExecutorStats getNotificationMgrExecutorStats() {
+        return notificationExecutorStatsBean.toThreadExecutorStats();
     }
 
     @Override
-    protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
+    public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
+        return notificationManagerStatsBean.getCurrentListenerQueueStats();
     }
 
     @Override
-    protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD;
+    public int getMaxNotificationMgrListenerQueueSize() {
+        return notificationManagerStatsBean.getMaxListenerQueueSize();
     }
 
     /**
      * resets the counters related to transactions
      */
-
+    @Override
     public void resetTransactionCounters(){
-        committedTransactionsCount = 0L;
+        committedTransactionsCount.set(0);
 
-        readOnlyTransactionCount = 0L;
+        readOnlyTransactionCount.set(0);
 
-        writeOnlyTransactionCount = 0L;
+        writeOnlyTransactionCount.set(0);
 
-        readWriteTransactionCount = 0L;
+        readWriteTransactionCount.set(0);
 
-        lastCommittedTransactionTime = new Date(0L);
+        lastCommittedTransactionTime = 0;
 
-        failedTransactionsCount = 0L;
+        failedTransactionsCount.set(0);
 
-        failedReadTransactionsCount = 0L;
+        failedReadTransactionsCount.set(0);
 
-        abortTransactionsCount = 0L;
+        abortTransactionsCount.set(0);
 
     }
-
-
 }
index c16f842..e69de29 100644 (file)
@@ -1,41 +0,0 @@
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-
-/**
- * @author: syedbahm
- */
-public interface ShardStatsMBean {
-    String getShardName();
-
-    long getCommittedTransactionsCount();
-
-    String getLeader();
-
-    String getRaftState();
-
-    long getReadOnlyTransactionCount();
-
-    long getWriteOnlyTransactionCount();
-
-    long getReadWriteTransactionCount();
-
-    long getLastLogIndex();
-
-    long getLastLogTerm();
-
-    long getCurrentTerm();
-
-    long getCommitIndex();
-
-    long getLastApplied();
-
-    String getLastCommittedTransactionTime();
-
-    long getFailedTransactionsCount();
-
-    long getFailedReadTransactionsCount();
-
-    long getAbortTransactionsCount();
-
-    void resetTransactionCounters();
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
new file mode 100644 (file)
index 0000000..8deb0ae
--- /dev/null
@@ -0,0 +1,54 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMXBean {
+
+   String getShardName();
+
+   long getCommittedTransactionsCount();
+
+   long getReadOnlyTransactionCount();
+
+   long getWriteOnlyTransactionCount();
+
+   long getReadWriteTransactionCount();
+
+   long getLastLogIndex();
+
+   long getLastLogTerm();
+
+   long getCurrentTerm();
+
+   long getCommitIndex();
+
+   long getLastApplied();
+
+   String getLastCommittedTransactionTime();
+
+   long getFailedTransactionsCount();
+
+   long getAbortTransactionsCount();
+
+   long getFailedReadTransactionsCount();
+
+   String getLeader();
+
+   String getRaftState();
+
+   ThreadExecutorStats getDataStoreExecutorStats();
+
+   ThreadExecutorStats getNotificationMgrExecutorStats();
+
+   List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats();
+
+   int getMaxNotificationMgrListenerQueueSize();
+
+   void resetTransactionCounters();
+}
index 0c609b4..99c8daf 100644 (file)
@@ -8,44 +8,32 @@
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
-
 import java.util.List;
 
-public class ShardManagerInfo extends AbstractBaseMBean implements
-    ShardManagerInfoMBean {
-
-    private final String name;
-    private final List<String> localShards;
-
-    public ShardManagerInfo(String name, List<String> localShards) {
-        this.name = name;
-        this.localShards = localShards;
-    }
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
+public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
 
-    @Override protected String getMBeanName() {
-        return name;
-    }
+    public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
-    @Override protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
-    }
+    private final List<String> localShards;
 
-    @Override protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD_MANAGER;
+    public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+        super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+        this.localShards = localShards;
     }
 
-    public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
-        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
-            localShards);
+    public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+            List<String> localShards){
+        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
 
         shardManagerInfo.registerMBean();
 
         return shardManagerInfo;
     }
 
-    @Override public List<String> getLocalShards() {
+    @Override
+    public List<String> getLocalShards() {
         return localShards;
     }
 }
index c5498ca..fc6bcff 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -46,4 +47,9 @@ public class ReadDataReply implements SerializableMessage{
     ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
     return new ReadDataReply(schemaContext,new NormalizedNodeToNodeCodec(schemaContext).decode(id, o.getNormalizedNode()));
   }
+
+  public static ByteString getNormalizedNodeByteString(Object serializable){
+      ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
+      return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+  }
 }