Merge "Bug 1637: Change Rpc actor calls to async"
authorMoiz Raja <moraja@cisco.com>
Thu, 4 Sep 2014 22:47:29 +0000 (22:47 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 4 Sep 2014 22:47:29 +0000 (22:47 +0000)
75 files changed:
features/akka/pom.xml [new file with mode: 0644]
features/akka/src/main/resources/features.xml [new file with mode: 0644]
features/base/pom.xml
features/base/src/main/resources/features.xml
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight/pom.xml
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-config/pom.xml
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java [moved from opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java with 59% similarity]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java
opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang

diff --git a/features/akka/pom.xml b/features/akka/pom.xml
new file mode 100644 (file)
index 0000000..f1f3017
--- /dev/null
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Necessary TODO: Put your copyright here.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.2-SNAPSHOT</version>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+   </parent>
+   <artifactId>features-akka</artifactId>
+   <groupId>org.opendaylight.controller</groupId>
+   <packaging>jar</packaging>
+   <properties>
+      <features.file>features.xml</features.file>
+      <!-- Optional TODO: Move these properties to your parent pom and possibly
+            DependencyManagement section of your parent pom -->
+      <branding.version>1.0.0-SNAPSHOT</branding.version>
+      <karaf.resources.version>1.4.2-SNAPSHOT</karaf.resources.version>
+      <karaf.version>3.0.1</karaf.version>
+      <feature.test.version>0.6.2-SNAPSHOT</feature.test.version>
+      <karaf.empty.version>1.4.2-SNAPSHOT</karaf.empty.version>
+      <surefire.version>2.16</surefire.version>
+   </properties>
+   <dependencies>
+    <!--
+      Necessary TODO: Put dependencies on any feature repos
+      you use in your features.xml file.
+
+      Note: they will need to be <type>xml</xml>
+      and <classifier>features</classifier>.
+      One other thing to watch for is to make sure they are
+      <scope>compile</compile>, which they should be by default,
+      but be cautious lest they be at a different scope in a parent pom.
+
+      Examples:
+        <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>features-yangtools</artifactId>
+          <version>0.6.2-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>features-mdsal</artifactId>
+          <version>1.1-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.openflowplugin</groupId>
+          <artifactId>features-openflowplugin</artifactId>
+          <version>0.0.3-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for bundles directly referenced
+      in your features.xml file.  For every <bundle> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Examples:
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-provider</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-model</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for configfiles directly referenced
+      in your features.xml file.  For every <configfile> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Example (presuming here version is coming from the parent pom):
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-config</artifactId>
+        <version>${project.version}</version>
+        <type>xml</type>
+        <classifier>config</classifier>
+      </dependency>
+    -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe</groupId>
+      <artifactId>config</artifactId>
+      <version>${typesafe.config.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.uncommons.maths</groupId>
+        <artifactId>uncommons-maths</artifactId>
+        <version>${uncommons.maths.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jcommon</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jfreechart</artifactId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.8.0.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-remote_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-cluster_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.iq80.leveldb</groupId>
+      <artifactId>leveldb</artifactId>
+      <version>${leveldb.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>${leveldbjni.version}</version>
+    </dependency>
+    <!--
+      Optional TODO: Remove TODO comments.
+    -->
+    <!-- test to validate features.xml -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-test</artifactId>
+      <version>${feature.test.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- dependency for opendaylight-karaf-empty for use by testing -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>opendaylight-karaf-empty</artifactId>
+      <version>${karaf.empty.version}</version>
+      <type>zip</type>
+    </dependency>
+    <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.2</version>
+    </dependency>
+    -->
+
+   </dependencies>
+   <build>
+      <resources>
+         <resource>
+            <directory>src/main/resources</directory>
+            <filtering>true</filtering>
+         </resource>
+      </resources>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>filter</id>
+                  <phase>generate-resources</phase>
+                  <goals>
+                     <goal>resources</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>attach-artifacts</id>
+                  <phase>package</phase>
+                  <goals>
+                     <goal>attach-artifact</goal>
+                  </goals>
+                  <configuration>
+                     <artifacts>
+                        <artifact>
+                           <file>${project.build.directory}/classes/${features.file}</file>
+                           <type>xml</type>
+                           <classifier>features</classifier>
+                        </artifact>
+                     </artifacts>
+                  </configuration>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>${surefire.version}</version>
+            <configuration>
+              <systemPropertyVariables>
+                <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+                <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+                <karaf.distro.version>${karaf.empty.version}</karaf.distro.version>
+              </systemPropertyVariables>
+              <dependenciesToScan>
+               <dependency>org.opendaylight.yangtools:features-test</dependency>
+              </dependenciesToScan>
+            </configuration>
+          </plugin>
+      </plugins>
+   </build>
+   <scm>
+      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+      <tag>HEAD</tag>
+      <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
+   </scm>
+</project>
diff --git a/features/akka/src/main/resources/features.xml b/features/akka/src/main/resources/features.xml
new file mode 100644 (file)
index 0000000..182ff76
--- /dev/null
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Necessary TODO: Put your copyright statement here
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<features name="odl-controller-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+    <!--
+        Necessary TODO: Please read the features guidelines:
+        https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Best_Practices
+    -->
+    <!--
+    Necessary TODO: Add repo entries for the repositories of features you refer to
+        in this feature file but do not define here.
+        Examples:
+            <repository>mvn:org.opendaylight.yangtools/features-yangtools/0.6.2-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.controller/features-mdsal/1.1-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.openflowplugin/features-openflowplugin/0.0.3-SNAPSHOT/xml/features</repository>
+    -->
+    <feature name='odl-akka-all' version='${project.version}' description='OpenDaylight :: Akka :: All'>
+        <!--
+            Necessary TODO:
+            List all of the user consumable features you define in this feature file here.
+            Generally you would *not* list individual bundles here, but only features defined in *this* file.
+            It is useful to list them in the same order they occur in the file.
+
+            Examples:
+            <feature version='${project.version}'>odl-controller-provider</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+        -->
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <feature version="${akka.version}">odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-persistence</feature>
+    </feature>
+    <!--
+        Necessary TODO: Define your features.  It is useful to list then in order of dependency.  So if A depends on B, list A first.
+        When naming your features please be mindful of the guidelines:
+            https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines
+        Particularly:
+            a) Prefixing names with 'odl-': https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Naming
+            b) Descriptions: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Description
+            c) Avoid start-levels: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Avoid_start-levels
+
+        It's also nice to list inside a feature, first the features it needs, then the bundles it needs, then the configfiles.
+        Examples:
+
+        * Basic MD-SAL Provider
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider '>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Model feature
+        <feature name='odl-controller-model' version='${project.version}' description='OpenDaylight :: controller :: Model'>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-binding</feature>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-models</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-model/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Config Subsystem example - the config file is your config subsystem configuration
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            <configfile finalname="etc/opendaylight/karaf/80-controller.xml">mvn:org.opendaylight.controller/controller-config/${project.version}/xml/config</configfile>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Provider that uses openflowplugin-flow-services (which brings along odl-mdsal-broker)
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='0.0.3-SNAPSHOT'>odl-openflowplugin-flow-services</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+    -->
+    <feature name="odl-akka-scala" description="Scala Runtime for OpenDaylight" version="${scala.version}">
+        <bundle>mvn:org.scala-lang/scala-library/${scala.version}.${scala.micro.version}</bundle>
+        <bundle>mvn:org.scala-lang/scala-reflect/${scala.version}.${scala.micro.version}</bundle>
+    </feature>
+    <feature name="odl-akka-system" description="Akka Actor Framework System Bundles" version="${akka.version}">
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <bundle>mvn:com.typesafe/config/${typesafe.config.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name="odl-akka-clustering" description="Akka Clustering" version="${akka.version}">
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>wrap:mvn:org.uncommons.maths/uncommons-maths/${uncommons.maths.version}</bundle>
+        <bundle>mvn:com.google.protobuf/protobuf-java/${protobuf.version}</bundle>
+        <bundle>mvn:io.netty/netty/3.8.0.Final</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name='odl-akka-leveldb' description='LevelDB' version='0.7'>
+        <bundle>wrap:mvn:org.iq80.leveldb/leveldb/${leveldb.version}</bundle>
+        <bundle>mvn:org.fusesource.leveldbjni/leveldbjni-all/${leveldbjni.version}</bundle>
+    </feature>
+    <feature name='odl-akka-persistence' description='Akka Persistence' version="${akka.version}">
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version}</bundle>
+        <bundle>wrap:mvn:com.google.protobuf/protobuf-java/${protobuf.version}$overwrite=merge&amp;DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.*</bundle>
+    </feature>
+    <!-- Optional TODO: Remove TODO Comments -->
+
+</features>
index ed8e2a8c20d2765ade8cbe42856f8b94460eeb40..8fec90fd9dc3ddc9ef3c3e80ecb040b617c44515 100644 (file)
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina</artifactId>
-      <version>7.0.53.v201406061610</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina.ha</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina.tribes</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.coyote</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.el</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.jasper</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.juli.extras</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.tomcat.api</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.tomcat.util</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>org.aopalliance</groupId>
index 999cf704d2bedf212a283864e6d8e4d74f10fd68..e4c455ccca6a7078fb0f319274b4af07b02a3853 100644 (file)
    <feature name="odl-base-tomcat" description="OpenDaylight Tomcat" version="7.0.53">
       <feature>odl-base-gemini-web</feature>
       <feature>odl-base-eclipselink-persistence</feature>
-      <bundle start="true">mvn:orbit/org.apache.catalina/${commons.karaf.catalina}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina/${commons.catalina}</bundle>
       <bundle start="true">mvn:geminiweb/org.eclipse.gemini.web.tomcat/${geminiweb.version}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.karaf.catalina.ha}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.karaf.catalina.tribes}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.coyote/${commons.karaf.coyote}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.el/${commons.karaf.el}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.jasper/${commons.karaf.jasper}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.karaf.juli.version}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.karaf.tomcat.api}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.karaf.tomcat.util}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.catalina.ha}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.catalina.tribes}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.coyote/${commons.coyote}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.el/${commons.el}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.jasper/${commons.jasper}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.juli.version}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.tomcat.api}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.tomcat.util}</bundle>
       <bundle start="true" >mvn:org.opendaylight.controller/karaf-tomcat-security/${karaf.security.version}</bundle>
       <bundle start="true">wrap:mvn:virgomirror/org.eclipse.jdt.core.compiler.batch/${eclipse.jdt.core.compiler.batch.version}</bundle>
    </feature>
index c6856c89fb4776718a7ea13cc1074cf71f403e12..38fe92fa8280a03d6f01e1a24035c067a24598fa 100644 (file)
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-akka</artifactId>
+      <version>${commons.opendaylight.version}</version>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-api</artifactId>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-distributed-datastore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-remoterpc-connector</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-commons</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-akka-raft</artifactId>
+      <version>${mdsal.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-netconf-connector</artifactId>
index 408be621f514b279a681bc7cdf6d53158f69fbc7..ae73c71ee4c9f46b362c48e98015b5e27f538497 100644 (file)
@@ -7,11 +7,13 @@
     <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-persister/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-netty/${config.version}/xml/features</repository>
+    <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
         <feature version='${project.version}'>odl-restconf</feature>
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
     <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <bundle>mvn:com.sun.jersey/jersey-servlet/${jersey.version}</bundle>
         <bundle>wrap:mvn:org.json/json/${org.json.version}</bundle>
     </feature>
+    <feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${akka.version}'>odl-akka-system</feature>
+        <feature version='${akka.version}'>odl-akka-persistence</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/sal-akka-raft/${project.version}</bundle>
+        <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
+    </feature>
+    <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
+        <feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
+        <configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
+        <configfile finalname="configuration/initial/akka.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf</configfile>
+        <configfile finalname="configuration/initial/module-shards.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf</configfile>
+        <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
+    </feature>
 </features>
index 039060b4aeb7bec891d74ccb5ea018ef19855bad..01156cf02a1af9343ff2a49c45feff037bb36e4a 100644 (file)
@@ -26,5 +26,6 @@
     <module>netconf</module>
     <module>protocol-framework</module>
     <module>adsal-compatibility</module>
+    <module>akka</module>
   </modules>
 </project>
\ No newline at end of file
index 2e953072681704e6d4b9b3d1251658d59da4f7f7..b05170ddc842e6011109dc3b4604cc32d53a5251 100644 (file)
     <commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
     <!-- Third Party Versions -->
     <codahale.metrics.version>3.0.1</codahale.metrics.version>
-    <commons.catalina>7.0.32.v201211201336</commons.catalina>
-    <commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
-    <commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
-    <commons.coyote>7.0.32.v201211201952</commons.coyote>
-    <commons.el>7.0.32.v201211081135</commons.el>
-    <commons.jasper>7.0.32.v201211201952</commons.jasper>
-    <commons.juli.version>7.0.32.v201211081135</commons.juli.version>
-    <commons.tomcat.api>7.0.32.v201211081135</commons.tomcat.api>
-    <commons.tomcat.util>7.0.32.v201211201952</commons.tomcat.util>
 
-    <commons.karaf.catalina>7.0.53.v201406061610</commons.karaf.catalina>
-    <commons.karaf.catalina.ha>7.0.53.v201406070630</commons.karaf.catalina.ha>
-    <commons.karaf.catalina.tribes>7.0.53.v201406070630</commons.karaf.catalina.tribes>
-    <commons.karaf.coyote>7.0.53.v201406070630</commons.karaf.coyote>
-    <commons.karaf.el>7.0.53.v201406060720</commons.karaf.el>
-    <commons.karaf.jasper>7.0.53.v201406070630</commons.karaf.jasper>
-    <commons.karaf.juli.version>7.0.53.v201406060720</commons.karaf.juli.version>
-    <commons.karaf.tomcat.api>7.0.53.v201406060720</commons.karaf.tomcat.api>
-    <commons.karaf.tomcat.util>7.0.53.v201406070630</commons.karaf.tomcat.util>
+    <commons.catalina>7.0.53.v201406061610</commons.catalina>
+    <commons.catalina.ha>7.0.53.v201406070630</commons.catalina.ha>
+    <commons.catalina.tribes>7.0.53.v201406070630</commons.catalina.tribes>
+    <commons.coyote>7.0.53.v201406070630</commons.coyote>
+    <commons.el>7.0.53.v201406060720</commons.el>
+    <commons.jasper>7.0.53.v201406070630</commons.jasper>
+    <commons.juli.version>7.0.53.v201406060720</commons.juli.version>
+    <commons.tomcat.api>7.0.53.v201406060720</commons.tomcat.api>
+    <commons.tomcat.util>7.0.53.v201406070630</commons.tomcat.util>
 
     <commons.checkstyle.version>0.0.3-SNAPSHOT</commons.checkstyle.version>
     <commons.fileupload.version>1.2.2</commons.fileupload.version>
@@ -77,6 +68,7 @@
     <concurrentlinkedhashmap.version>1.4</concurrentlinkedhashmap.version>
     <config.version>0.2.5-SNAPSHOT</config.version>
     <config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
+    <config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
     <config.netty.configfile>00-netty.xml</config.netty.configfile>
     <config.mdsal.configfile>01-mdsal.xml</config.mdsal.configfile>
     <config.xsql.configfile>04-xsql.xml</config.xsql.configfile>
     <topologymanager.shell.version>1.0.0-SNAPSHOT</topologymanager.shell.version>
     <troubleshoot.web.version>0.4.2-SNAPSHOT</troubleshoot.web.version>
     <typesafe.config.version>1.2.0</typesafe.config.version>
-    <uncommons.maths.version>1.2.2</uncommons.maths.version>
+    <uncommons.maths.version>1.2.2a</uncommons.maths.version>
     <usermanager.implementation.version>0.4.2-SNAPSHOT</usermanager.implementation.version>
     <usermanager.northbound.version>0.0.2-SNAPSHOT</usermanager.northbound.version>
     <usermanager.version>0.4.2-SNAPSHOT</usermanager.version>
index c2ac77a5d6f57b3ba24a581a700dead2fcc8a377..a644bf6ee835f0434ec6f4a1678dc48f3fcf9cf2 100644 (file)
@@ -92,6 +92,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index 4ae35c905f122a6691072df720882ac88b68ddee..fcb452f42239c98d838d580fbbf9e94ce623f89b 100644 (file)
       <groupId>org.opendaylight.controller.thirdparty</groupId>
       <artifactId>net.sf.jung2</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller.thirdparty</groupId>
-      <artifactId>org.apache.catalina.filters.CorsFilter</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller.thirdparty</groupId>
       <artifactId>org.openflow.openflowj</artifactId>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-restconf-broker</artifactId>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-remoterpc-connector</artifactId>
-        </dependency>
 
 
         <dependency>
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-distributed-datastore</artifactId>
-        </dependency>
         <dependency>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-clustering-config</artifactId>
                 <phase>generate-resources</phase>
                 <configuration>
                    <outputDirectory>${project.build.directory}/configuration</outputDirectory>
-                   <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
-                   <includes>**\/*.xml</includes>
+                    <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config,sal-clustering-config</includeArtifactIds>
+                    <includes>**\/*.xml,**/*.conf</includes>
                    <excludeTransitive>true</excludeTransitive>
                    <ignorePermissions>false</ignorePermissions>
                 </configuration>
index b2fc3cb386ffa4525fb42dd3fae685c7b6a39e97..530e46e14a89e89fedee8657b19849cc9d0cfac4 100644 (file)
@@ -116,6 +116,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index 325005b239f83cbdf2a28bd7e01fbd1e88eb1ebe..98c81c267fae2c8dd5ec70ca9d663fc33b5e340c 100644 (file)
@@ -97,9 +97,8 @@
         <configuration>
           <instructions>
             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
-            <Export-package></Export-package>
-            <Private-Package></Private-Package>
-            <Import-Package></Import-Package>
+            <Export-package>org.opendaylight.cluster.raft</Export-package>
+            <Import-Package>*</Import-Package>
           </instructions>
         </configuration>
       </plugin>
index cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9..c4ff108611d9fbdb177f2ef4ace98bb030d69991 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected Object createSnapshot() {
-        return state;
+    @Override protected void createSnapshot() {
+        ByteString bs = null;
+        try {
+            bs = fromObject(state);
+        } catch (Exception e) {
+            LOG.error("Exception in creating snapshot", e);
+        }
+        getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
-        state.putAll((HashMap) snapshot);
-        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+        try {
+            state.putAll((HashMap) toObject(snapshot));
+        } catch (Exception e) {
+           LOG.error("Exception in applying snapshot", e);
+        }
+        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+    }
+
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
+
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs.toByteArray());
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
     }
 
     @Override protected void onStateChanged() {
index d11377dbcb359e58fdbbf3494a20ca6ded6565c4..6192cad2307b99896326619233bf9903f7aeb41a 100644 (file)
@@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     public long getSnapshotBatchCount() {
         return 50;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return 50;
+    }
 }
index fd6e192bf0497777de2643a1b3f28a2a76b72e42..978ea91089dbcb1a530c9d66f6878ffa06579e2d 100644 (file)
@@ -109,6 +109,8 @@ public class TestDriver {
                 td.printState();
             } else if (command.startsWith("printNodes")) {
                 td.printNodes();
+            } else {
+                System.out.println("Invalid command:" + command);
             }
 
         }
index b5b034afb9cf8edc7635cfca5509c93cbeb457b5..b436bce50061f98d0362ce3df4336c4279977d63 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -16,12 +18,18 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    protected List<ReplicatedLogEntry> journal;
+    protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ByteString previousSnapshot;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
@@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
-    public Object getSnapshot() {
+    public ByteString getSnapshot() {
         return snapshot;
     }
 
@@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+
+        previousSnapshot = getSnapshot();
+        setSnapshot(snapshot);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal.clear();
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        previousSnapshot = null;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal.clear();
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+
+        snapshot = previousSnapshot;
+        previousSnapshot = null;
+
+    }
 }
index 4c6434aec457db0b4899973f36effd0b37e7d14a..ed6439d8c33bceb545927fd6c2f892665b160f8a 100644 (file)
@@ -52,4 +52,9 @@ public interface ConfigParams {
      * @return int
      */
     public int getElectionTimeVariance();
+
+    /**
+     * The size (in bytes) of the snapshot chunk sent from Leader
+     */
+    public int getSnapshotChunkSize();
 }
index 6432fa4811beb64ef13f6869e6e252289c2163be..75c237f5035e57abd61c839835ec3c78548a6157 100644 (file)
@@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return SNAPSHOT_CHUNK_SIZE;
+    }
 }
index 988789b4011e4f3ba9e9e9abe6b0c3009704c810..296ce2d24aaa24b3920d789697ddb784ccec7bea 100644 (file)
@@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
+    private CaptureSnapshot captureSnapshot = null;
+
+    private volatile boolean hasSnapshotCaptureInitiated = false;
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
@@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
             context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
             LOG.debug("Applied snapshot to replicatedLog. " +
                 "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.size());
 
             // Apply the snapshot to the actors state
-            applySnapshot(snapshot.getState());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
@@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 applyState.getReplicatedLogEntry().getData());
 
         } else if(message instanceof ApplySnapshot ) {
-            applySnapshot(((ApplySnapshot) message).getSnapshot());
+            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+            LOG.debug("ApplySnapshot called on Follower Actor " +
+                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            //clears the followers log, sets the snapshot index to ensure adjusted-index works
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+            context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+            LOG.info("SaveSnapshotSuccess received for snapshot");
+
+            context.getReplicatedLog().snapshotCommit();
 
             // TODO: Not sure if we want to be this aggressive with trimming stuff
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
 
-            // TODO: Handle failure in saving the snapshot
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+                context.getReplicatedLog().getSnapshotIndex(),
+                context.getReplicatedLog().getSnapshotTerm(),
+                context.getReplicatedLog().size());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
 
+        } else if (message instanceof CaptureSnapshot) {
+            LOG.debug("CaptureSnapshot received by actor");
+            CaptureSnapshot cs = (CaptureSnapshot)message;
+            captureSnapshot = cs;
+            createSnapshot();
+
+        } else if (message instanceof CaptureSnapshotReply){
+            LOG.debug("CaptureSnapshotReply received by actor");
+            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+            ByteString stateInBytes = csr.getSnapshot();
+            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            handleCaptureSnapshotReply(stateInBytes);
+
         } else {
+            if (!(message instanceof AppendEntriesMessages.AppendEntries)
+                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+                LOG.debug("onReceiveCommand: message:" + message.getClass());
+            }
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -344,7 +394,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return The current state of the actor
      */
-    protected abstract Object createSnapshot();
+    protected abstract void createSnapshot();
 
     /**
      * This method will be called by the RaftActor during recovery to
@@ -356,7 +406,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(Object snapshot);
+    protected abstract void applySnapshot(ByteString snapshot);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -423,11 +473,39 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return peerAddress;
     }
 
+    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+        // create a snapshot object from the state provided and save it
+        // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+        saveSnapshot(sn);
+
+        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+        //be greedy and remove entries from in-mem journal which are in the snapshot
+        // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+            captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        captureSnapshot = null;
+        hasSnapshotCaptureInitiated = false;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getState(),
+            super(ByteString.copyFrom(snapshot.getState()),
                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
@@ -476,8 +554,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                        // when a snaphsot is being taken, captureSnapshot != null
+                        if (hasSnapshotCaptureInitiated == false &&
+                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -493,26 +573,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
                             LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
                             LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
 
-                            // create a snapshot object from the state provided and save it
-                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
-                            Snapshot sn = Snapshot.create(createSnapshot(),
-                                getFrom(context.getLastApplied() + 1),
-                                lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm);
-                            saveSnapshot(sn);
-
-                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
-                            //be greedy and remove entries from in-mem journal which are in the snapshot
-                            // and update snapshotIndex and snapshotTerm without waiting for the success,
-                            // TODO: damage-recovery to be done on failure
-                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
-                            snapshotIndex = lastAppliedIndex;
-                            snapshotTerm = lastAppliedTerm;
-
-                            LOG.info("Removed in-memory snapshotted entries, " +
-                                "adjusted snaphsotIndex:{}" +
-                                "and term:{}", snapshotIndex, lastAppliedTerm);
+                            // send a CaptureSnapshot to self to make the expensive operation async.
+                            getSelf().tell(new CaptureSnapshot(
+                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                                null);
+                            hasSnapshotCaptureInitiated = true;
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -542,65 +607,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class Snapshot implements Serializable {
-        private final Object state;
-        private final List<ReplicatedLogEntry> unAppliedEntries;
-        private final long lastIndex;
-        private final long lastTerm;
-        private final long lastAppliedIndex;
-        private final long lastAppliedTerm;
-
-        private Snapshot(Object state,
-            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
-            this.state = state;
-            this.unAppliedEntries = unAppliedEntries;
-            this.lastIndex = lastIndex;
-            this.lastTerm = lastTerm;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-
-
-        public static Snapshot create(Object state,
-            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm) {
-            return new Snapshot(state, entries, lastIndex, lastTerm,
-                lastAppliedIndex, lastAppliedTerm);
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public List<ReplicatedLogEntry> getUnAppliedEntries() {
-            return unAppliedEntries;
-        }
-
-        public long getLastTerm() {
-            return lastTerm;
-        }
-
-        public long getLastAppliedIndex() {
-            return lastAppliedIndex;
-        }
-
-        public long getLastAppliedTerm() {
-            return lastAppliedTerm;
-        }
-
-        public String getLogMessage() {
-            StringBuilder sb = new StringBuilder();
-            return sb.append("Snapshot={")
-                .append("lastTerm:" + this.getLastTerm()  + ", ")
-                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-                .toString();
-
-        }
-    }
-
     private class ElectionTermImpl implements ElectionTerm {
         /**
          * Identifier of the actor whose election term information this is
index e6e160bc02bf1fd72305325aefc91a1ac2a9fac0..c17f5448c6e256a97c4f7134959bb6c2d88a0971 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.List;
 
 /**
@@ -118,7 +120,7 @@ public interface ReplicatedLog {
      *
      * @return an object representing the snapshot if it exists. null otherwise
      */
-    Object getSnapshot();
+    ByteString getSnapshot();
 
     /**
      * Get the index of the snapshot
@@ -134,4 +136,49 @@ public interface ReplicatedLog {
      * otherwise
      */
     long getSnapshotTerm();
+
+    /**
+     * sets the snapshot index in the replicated log
+     * @param snapshotIndex
+     */
+    void setSnapshotIndex(long snapshotIndex);
+
+    /**
+     * sets snapshot term
+     * @param snapshotTerm
+     */
+    public void setSnapshotTerm(long snapshotTerm);
+
+    /**
+     * sets the snapshot in bytes
+     * @param snapshot
+     */
+    public void setSnapshot(ByteString snapshot);
+
+    /**
+     * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+     * @param startIndex
+     * @param endIndex
+     */
+    public void clear(int startIndex, int endIndex);
+
+    /**
+     * Handles all the bookkeeping in order to perform a rollback in the
+     * event of SaveSnapshotFailure
+     * @param snapshot
+     * @param snapshotCapturedIndex
+     * @param snapshotCapturedTerm
+     */
+    public void snapshotPreCommit(ByteString snapshot,
+        long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+    /**
+     * Sets the Replicated log to state after snapshot success.
+     */
+    public void snapshotCommit();
+
+    /**
+     * Restores the replicated log to a state in the event of a save snapshot failure
+     */
+    public void snapshotRollback();
 }
index 374e0fa9ba766cfe0417347fc4d6a4602d21fcdb..2f5ba48f9258d9a47ebf62ac98966db5d1055d99 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 
 public class SerializationUtils {
 
     public static Object fromSerializable(Object serializable){
         if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
             return AppendEntries.fromSerializable(serializable);
+
+        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+            return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
new file mode 100644 (file)
index 0000000..8e0fcca
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+    private final byte[] state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+
+    private Snapshot(byte[] state,
+        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+
+    public static Snapshot create(byte[] state,
+        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm,
+            lastAppliedIndex, lastAppliedTerm);
+    }
+
+    public byte[] getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public String getLogMessage() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("Snapshot={")
+            .append("lastTerm:" + this.getLastTerm() + ", ")
+            .append("lastIndex:" + this.getLastIndex()  + ", ")
+            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+            .toString();
+
+    }
+}
index 9739fb2f1b0ccb42e5fcd0446e5769194797be31..c356804223c696e83f10c73e17da35d63d419c85 100644 (file)
@@ -8,16 +8,21 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
 import java.io.Serializable;
 
+/**
+ * Internal message, issued by follower to its actor
+ */
 public class ApplySnapshot implements Serializable {
-    private final Object snapshot;
+    private final Snapshot snapshot;
 
-    public ApplySnapshot(Object snapshot) {
+    public ApplySnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public Object getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
new file mode 100644 (file)
index 0000000..bb86e1a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+    private long lastAppliedIndex;
+    private long lastAppliedTerm;
+    private long lastIndex;
+    private long lastTerm;
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return lastIndex;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
new file mode 100644 (file)
index 0000000..96150db
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+    private ByteString snapshot;
+
+    public CaptureSnapshotReply(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+}
index 251a13d583ec444ac4ca0c1cc028831feeb48958..7e896fed29c4889f6aec5ce39436a1970a50e03b 100644 (file)
@@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index a log index that is known to be committed
      */
     protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
@@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (replicatedLogEntry != null) {
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
                 context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().debug("Setting last applied to {}", index);
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 54e0494b9da65305729afcf00686e3102c31dd00..610fdc987fde7a1a51491ef25dc2f764011b9eda 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }
index 234f9db664e4d43e833e4e354e3d3045094dd381..90948ffef7d8a5e1341bb8aede6b03ccf8dae344 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    private final Map<String, FollowerLogInformation> followerToLog =
+    protected final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
@@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
-        InstallSnapshotReply reply = message;
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerToSnapshot followerToSnapshot =
+            mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    context.getLogger().debug("InstallSnapshotReply received, " +
+                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                        reply.getChunkIndex(), followerId,
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+
+                    FollowerLogInformation followerLogInformation =
+                        followerToLog.get(followerId);
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                        followerToLog.get(followerId).getNextIndex().get());
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                context.getLogger().info("InstallSnapshotReply received, " +
+                    "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex());
+                followerToSnapshot.markSendStatus(false);
+            }
 
-        followerLogInformation
-            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
-        followerLogInformation
-            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+        } else {
+            context.getLogger().error("ERROR!!" +
+                "FollowerId in InstallSnapshotReply not known to Leader" +
+                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+        }
     }
 
     private void replicate(Replicate replicate) {
@@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior {
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
         for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-                if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // FIXME : Sending one entry at a time
-                    entries =
-                        context.getReplicatedLog().getFrom(nextIndex, 1);
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    }
+
+                } else {
+
+                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex(followerNextIndex),
+                                prevLogTerm(followerNextIndex), entries,
+                                context.getCommitIndex()).toSerializable(),
+                            actor()
+                        );
+
+                    } else {
+                        // if the followers next index is not present in the leaders log, then snapshot should be sent
+                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+                            // if the follower is just not starting and leader's index
+                            // is more than followers index
+                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+                            actor().tell(new SendInstallSnapshot(), actor());
+                        } else {
+                            followerActor.tell(
+                                new AppendEntries(currentTerm(), context.getId(),
+                                    prevLogIndex(followerNextIndex),
+                                    prevLogTerm(followerNextIndex), entries,
+                                    context.getCommitIndex()).toSerializable(),
+                                actor()
+                            );
+                        }
+                    }
                 }
-
-                followerActor.tell(
-                    new AppendEntries(currentTerm(), context.getId(),
-                        prevLogIndex(nextIndex),
-                        prevLogTerm(nextIndex), entries,
-                        context.getCommitIndex()).toSerializable(),
-                    actor()
-                );
             }
         }
     }
@@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
-                if (!context.getReplicatedLog().isPresent(nextIndex) && context
-                    .getReplicatedLog().isInSnapshot(nextIndex)) {
-                    followerActor.tell(
-                        new InstallSnapshot(currentTerm(), context.getId(),
-                            context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().getSnapshotTerm(),
-                            context.getReplicatedLog().getSnapshot()
-                        ),
-                        actor()
-                    );
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
     }
 
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    getNextSnapshotChunk(followerId,
+                        context.getReplicatedLog().getSnapshot()),
+                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                ).toSerializable(),
+                actor()
+            );
+            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+        } catch (IOException e) {
+            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+        return nextChunk;
+    }
+
     private RaftState sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
@@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior {
         return context.getId();
     }
 
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+                size, totalChunks);
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            context.getLogger().debug("length={}, offset={},size={}",
+                snapshotLength, start, size);
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
 }
index 888854fa71eaf4f745e11456c7b09349a7d8a443..9d40fa3d9edb3858969797e929776ddcba424333 100644 (file)
@@ -8,19 +8,29 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
 public class InstallSnapshot extends AbstractRaftRPC {
 
+    public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
     private final String leaderId;
     private final long lastIncludedIndex;
     private final long lastIncludedTerm;
-    private final Object data;
+    private final ByteString data;
+    private final int chunkIndex;
+    private final int totalChunks;
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
         this.lastIncludedTerm = lastIncludedTerm;
         this.data = data;
+        this.chunkIndex = chunkIndex;
+        this.totalChunks = totalChunks;
     }
 
     public String getLeaderId() {
@@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public Object getData() {
+    public ByteString getData() {
         return data;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public <T extends Object> Object toSerializable(){
+        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+            .setLeaderId(this.getLeaderId())
+            .setChunkIndex(this.getChunkIndex())
+            .setData(this.getData())
+            .setLastIncludedIndex(this.getLastIncludedIndex())
+            .setLastIncludedTerm(this.getLastIncludedTerm())
+            .setTotalChunks(this.getTotalChunks()).build();
+
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        InstallSnapshotMessages.InstallSnapshot from =
+            (InstallSnapshotMessages.InstallSnapshot) o;
+
+        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+            from.getLeaderId(), from.getLastIncludedIndex(),
+            from.getLastIncludedTerm(), from.getData(),
+            from.getChunkIndex(), from.getTotalChunks());
+
+        return installSnapshot;
+    }
 }
index 85b89b70ae2b84c8ab26fcb7e16ba041fbd2040a..d293a47c8efdd1521afa4652f7a6a100e4c692ab 100644 (file)
@@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
+    private final int chunkIndex;
+    private boolean success;
 
-    protected InstallSnapshotReply(long term, String followerId) {
+    public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+        boolean success) {
         super(term);
         this.followerId = followerId;
+        this.chunkIndex = chunkIndex;
+        this.success = success;
     }
 
     public String getFollowerId() {
         return followerId;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java
new file mode 100644 (file)
index 0000000..e801ae1
--- /dev/null
@@ -0,0 +1,1015 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+  private InstallSnapshotMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface InstallSnapshotOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 lastIncludedIndex = 3;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    boolean hasLastIncludedIndex();
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    long getLastIncludedIndex();
+
+    // optional int64 lastIncludedTerm = 4;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    boolean hasLastIncludedTerm();
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    long getLastIncludedTerm();
+
+    // optional bytes data = 5;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    boolean hasData();
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    com.google.protobuf.ByteString getData();
+
+    // optional int32 chunkIndex = 6;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    boolean hasChunkIndex();
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    int getChunkIndex();
+
+    // optional int32 totalChunks = 7;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    boolean hasTotalChunks();
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    int getTotalChunks();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+   */
+  public static final class InstallSnapshot extends
+      com.google.protobuf.GeneratedMessage
+      implements InstallSnapshotOrBuilder {
+    // Use InstallSnapshot.newBuilder() to construct.
+    private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final InstallSnapshot defaultInstance;
+    public static InstallSnapshot getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public InstallSnapshot getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private InstallSnapshot(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              lastIncludedIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              lastIncludedTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              data_ = input.readBytes();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              chunkIndex_ = input.readInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              totalChunks_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+        new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+      public InstallSnapshot parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new InstallSnapshot(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 term = 1;
+    public static final int TERM_FIELD_NUMBER = 1;
+    private long term_;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public boolean hasTerm() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public long getTerm() {
+      return term_;
+    }
+
+    // optional string leaderId = 2;
+    public static final int LEADERID_FIELD_NUMBER = 2;
+    private java.lang.Object leaderId_;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public boolean hasLeaderId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public java.lang.String getLeaderId() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          leaderId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLeaderIdBytes() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        leaderId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional int64 lastIncludedIndex = 3;
+    public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+    private long lastIncludedIndex_;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public boolean hasLastIncludedIndex() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public long getLastIncludedIndex() {
+      return lastIncludedIndex_;
+    }
+
+    // optional int64 lastIncludedTerm = 4;
+    public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+    private long lastIncludedTerm_;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public boolean hasLastIncludedTerm() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public long getLastIncludedTerm() {
+      return lastIncludedTerm_;
+    }
+
+    // optional bytes data = 5;
+    public static final int DATA_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString data_;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public boolean hasData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public com.google.protobuf.ByteString getData() {
+      return data_;
+    }
+
+    // optional int32 chunkIndex = 6;
+    public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+    private int chunkIndex_;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public boolean hasChunkIndex() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public int getChunkIndex() {
+      return chunkIndex_;
+    }
+
+    // optional int32 totalChunks = 7;
+    public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+    private int totalChunks_;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public boolean hasTotalChunks() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public int getTotalChunks() {
+      return totalChunks_;
+    }
+
+    private void initFields() {
+      term_ = 0L;
+      leaderId_ = "";
+      lastIncludedIndex_ = 0L;
+      lastIncludedTerm_ = 0L;
+      data_ = com.google.protobuf.ByteString.EMPTY;
+      chunkIndex_ = 0;
+      totalChunks_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt32(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, totalChunks_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, totalChunks_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+      }
+
+      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        term_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        leaderId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lastIncludedIndex_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedTerm_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        data_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        chunkIndex_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        totalChunks_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.term_ = term_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.leaderId_ = leaderId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.lastIncludedIndex_ = lastIncludedIndex_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastIncludedTerm_ = lastIncludedTerm_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.data_ = data_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.chunkIndex_ = chunkIndex_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.totalChunks_ = totalChunks_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+        if (other.hasTerm()) {
+          setTerm(other.getTerm());
+        }
+        if (other.hasLeaderId()) {
+          bitField0_ |= 0x00000002;
+          leaderId_ = other.leaderId_;
+          onChanged();
+        }
+        if (other.hasLastIncludedIndex()) {
+          setLastIncludedIndex(other.getLastIncludedIndex());
+        }
+        if (other.hasLastIncludedTerm()) {
+          setLastIncludedTerm(other.getLastIncludedTerm());
+        }
+        if (other.hasData()) {
+          setData(other.getData());
+        }
+        if (other.hasChunkIndex()) {
+          setChunkIndex(other.getChunkIndex());
+        }
+        if (other.hasTotalChunks()) {
+          setTotalChunks(other.getTotalChunks());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 term = 1;
+      private long term_ ;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public boolean hasTerm() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public long getTerm() {
+        return term_;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder setTerm(long value) {
+        bitField0_ |= 0x00000001;
+        term_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder clearTerm() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        term_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional string leaderId = 2;
+      private java.lang.Object leaderId_ = "";
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public boolean hasLeaderId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public java.lang.String getLeaderId() {
+        java.lang.Object ref = leaderId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          leaderId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLeaderIdBytes() {
+        java.lang.Object ref = leaderId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          leaderId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder clearLeaderId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        leaderId_ = getDefaultInstance().getLeaderId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedIndex = 3;
+      private long lastIncludedIndex_ ;
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public boolean hasLastIncludedIndex() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public long getLastIncludedIndex() {
+        return lastIncludedIndex_;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder setLastIncludedIndex(long value) {
+        bitField0_ |= 0x00000004;
+        lastIncludedIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder clearLastIncludedIndex() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedIndex_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedTerm = 4;
+      private long lastIncludedTerm_ ;
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public boolean hasLastIncludedTerm() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public long getLastIncludedTerm() {
+        return lastIncludedTerm_;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder setLastIncludedTerm(long value) {
+        bitField0_ |= 0x00000008;
+        lastIncludedTerm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder clearLastIncludedTerm() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        lastIncludedTerm_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes data = 5;
+      private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public boolean hasData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public com.google.protobuf.ByteString getData() {
+        return data_;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder setData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        data_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder clearData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        data_ = getDefaultInstance().getData();
+        onChanged();
+        return this;
+      }
+
+      // optional int32 chunkIndex = 6;
+      private int chunkIndex_ ;
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public boolean hasChunkIndex() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public int getChunkIndex() {
+        return chunkIndex_;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder setChunkIndex(int value) {
+        bitField0_ |= 0x00000020;
+        chunkIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder clearChunkIndex() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        chunkIndex_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 totalChunks = 7;
+      private int totalChunks_ ;
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public boolean hasTotalChunks() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public int getTotalChunks() {
+        return totalChunks_;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder setTotalChunks(int value) {
+        bitField0_ |= 0x00000040;
+        totalChunks_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder clearTotalChunks() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        totalChunks_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+    }
+
+    static {
+      defaultInstance = new InstallSnapshot(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+      "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+      "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+      "light.controller.cluster.raft.protobuff." +
+      "messagesB\027InstallSnapshotMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
new file mode 100644 (file)
index 0000000..14f821b
--- /dev/null
@@ -0,0 +1,15 @@
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+    optional int64 term = 1;
+    optional string leaderId = 2;
+    optional int64 lastIncludedIndex = 3;
+    optional int64 lastIncludedTerm = 4;
+    optional bytes data = 5;
+    optional int32 chunkIndex = 6;
+    optional int32 totalChunks = 7;
+}
index ea3c9e759d51744fb2dd36212bf8372e2f06f0c8..ca34a34ca49337783ec7fda8a9b9966fba1f16ac 100644 (file)
@@ -14,17 +14,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
@@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap();
+    private ConfigParams configParams;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext {
             }
         };
 
+        configParams = new DefaultConfigParamsImpl();
+
         initReplicatedLog();
     }
 
@@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public ConfigParams getConfigParams() {
-        return new DefaultConfigParamsImpl();
+        return configParams;
     }
 
-    public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
-        @Override public ReplicatedLogEntry get(long index) {
-            if(index >= log.size() || index < 0){
-                return null;
-            }
-            return log.get((int) index);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if(log.size() == 0){
-                return null;
-            }
-            return log.get(log.size()-1);
-        }
-
-        @Override public long lastIndex() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return;
-            }
-
-            log.subList((int) index, log.size()).clear();
-            //log.remove((int) index);
-        }
-
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            log.add(replicatedLogEntry);
-        }
+    public void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
 
+    public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
 
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            for(int i=(int) index ; i < log.size() ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            int maxIndex = (int) index + max;
-            if(maxIndex > log.size()){
-                maxIndex = log.size();
-            }
-
-            for(int i=(int) index ; i < maxIndex ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-
-        }
-
-        @Override public long size() {
-            return log.size();
-        }
-
-        @Override public boolean isPresent(long index) {
-            if(index >= log.size() || index < 0){
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return false;
-        }
-
-        @Override public Object getSnapshot() {
-            return null;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return -1;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return -1;
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
     }
 
index ff0ffeb271b55b38455901d03cb31871620b078c..12123db12995061901a39a264c79f0237d78d00a 100644 (file)
@@ -6,6 +6,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest {
             Object data) {
         }
 
-        @Override protected Object createSnapshot() {
+        @Override protected void createSnapshot() {
             throw new UnsupportedOperationException("createSnapshot");
         }
 
-        @Override protected void applySnapshot(Object snapshot) {
+        @Override protected void applySnapshot(ByteString snapshot) {
             throw new UnsupportedOperationException("applySnapshot");
         }
 
index c5a81aa1c9225ea03fa548bf1950d5e73a7e3329..227d1effa7e9b8b3bb65932fe8c25b1a2eecdbf5 100644 (file)
@@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
                         new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
index 17c22a134a9a7f26e08998930b2609b128f40c21..73c9f96b82a0a582f4cf5e61b5d68c488f9bc198 100644 (file)
@@ -1,24 +1,40 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
-
-
             };
         }};
     }
@@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
+            };
+        }};
+    }
+
+    @Test
+    public void testSendInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext(getRef());
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+                    RaftState raftState = leader.handleMessage(
+                        senderActor, new Replicate(null, "state-id", entry));
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                        @Override
+                        protected Boolean match(Object o) throws Exception {
+                            if (o instanceof SendInstallSnapshot) {
+                                return true;
+                            }
+                            return false;
+                        }
+                    }.get();
+
+                    boolean sendInstallSnapshotReceived = false;
+                    for (Boolean b: matches) {
+                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+                    }
+
+                    assertTrue(sendInstallSnapshotReceived);
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+
+                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // check if installsnapshot gets called with the correct values.
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                                    InstallSnapshot is = (InstallSnapshot)
+                                        SerializationUtils.fromSerializable(in);
+                                    if (is.getData() == null) {
+                                        return "InstallSnapshot data is null";
+                                    }
+                                    if (is.getLastIncludedIndex() != snapshotIndex) {
+                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                                    }
+                                    if (is.getLastIncludedTerm() != snapshotTerm) {
+                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                                    }
+                                    if (is.getTerm() == currentTerm) {
+                                        return is.getTerm() + "!=" + currentTerm;
+                                    }
+
+                                    return "match";
+
+                               } else {
+                                    return "message mismatch:" + in.getClass();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new LeaderTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    ByteString bs = toByteString(leadersSnapshot);
+                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                        leader.getFollowerToSnapshot().getNextChunk();
+                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                    }
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
 
+                    RaftState raftState = leader.handleMessage(senderActor,
+                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                            leader.getFollowerToSnapshot().getChunkIndex(), true));
 
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+                    assertEquals(leader.followerToLog.size(), 1);
+                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+                }
             };
         }};
     }
 
+    @Test
+    public void testFollowerToSnapshotLogic() {
+
+        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
+
+        MockLeader leader = new MockLeader(actorContext);
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        ByteString bs = toByteString(leadersSnapshot);
+        byte[] barray = bs.toByteArray();
+
+        leader.createFollowerToSnapshot("followerId", bs);
+        assertEquals(bs.size(), barray.length);
+
+        int chunkIndex=0;
+        for (int i=0; i < barray.length; i = i + 50) {
+            int j = i + 50;
+            chunkIndex++;
+
+            if (i + 50 > barray.length) {
+                j = barray.length;
+            }
+
+            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+            leader.getFollowerToSnapshot().markSendStatus(true);
+            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+        }
+
+        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+    }
+
+
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), leaderActor);
+        return createActorContext(leaderActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef actorRef) {
+        return new MockRaftActorContext("test", getSystem(), actorRef);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+    private static class LeaderTestKit extends JavaTestKit {
+
+        private LeaderTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+    }
+
+    class MockLeader extends Leader {
+
+        FollowerToSnapshot fts;
+
+        public MockLeader(RaftActorContext context){
+            super(context);
+        }
+
+        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(nextIndex),
+                    new AtomicLong(matchIndex));
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        public FollowerToSnapshot getFollowerToSnapshot() {
+            return fts;
+        }
+
+        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+            fts = new FollowerToSnapshot(bs);
+            mapFollowerToSnapshot.put(followerId, fts);
+
+        }
     }
 }
index 2d8e51cff9d1a1c00e50a03aa671a9d97fce133e..d16170ba481d7d1a295a6acdcf33ae83b73b8a7d 100644 (file)
@@ -7,11 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import javax.annotation.concurrent.GuardedBy;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -19,6 +18,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -26,20 +26,19 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.yangtools.concepts.Delegator;
 
-import com.google.common.base.Preconditions;
-
 class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
 
     private final DOMTransactionChain delegate;
-
-    @GuardedBy("this")
-    private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
     private final BindingToNormalizedNodeCodec codec;
+    private final DelegateChainListener delegatingListener;
+    private final TransactionChainListener listener;
 
     public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
             final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
         Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
-        this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+        this.delegatingListener = new DelegateChainListener();
+        this.listener = listener;
+        this.delegate = chainFactory.createTransactionChain(listener);
         this.codec = codec;
     }
 
@@ -52,56 +51,79 @@ class BindingTranslatedTransactionChain implements BindingTransactionChain, Dele
     public ReadOnlyTransaction newReadOnlyTransaction() {
         DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
         ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
         return bindingTx;
     }
 
     @Override
     public ReadWriteTransaction newReadWriteTransaction() {
         DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
-        ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
+        ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec) {
+
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+                return listenForFailure(this,super.submit());
+            }
+
+        };
         return bindingTx;
     }
 
     @Override
     public WriteTransaction newWriteOnlyTransaction() {
-        DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
-        WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
+        final DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+        WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(delegateTx, codec) {
+
+            @Override
+            public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+                return listenForFailure(this,super.submit());
+            };
+
+        };
         return bindingTx;
     }
 
-    @Override
-    public void close() {
-        delegate.close();
+    protected CheckedFuture<Void, TransactionCommitFailedException> listenForFailure(
+            final WriteTransaction tx, CheckedFuture<Void, TransactionCommitFailedException> future) {
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onFailure(Throwable t) {
+                failTransactionChain(tx,t);
+            }
+
+            @Override
+            public void onSuccess(Void result) {
+                // Intentionally NOOP
+            }
+        });
+
+        return future;
     }
 
-    private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
-            final AsyncTransaction<?, ?> bindingTx) {
-        final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
-        Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+    protected void failTransactionChain(WriteTransaction tx, Throwable t) {
+        // We asume correct state change for underlaying transaction
+        // chain, so we are not changing any of our internal state
+        // to mark that we failed.
+        this.delegatingListener.onTransactionChainFailed(this, tx, t);
     }
 
-    private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
-        return delegateTxToBindingTx.get(transaction);
+    @Override
+    public void close() {
+        delegate.close();
     }
 
-    private final class ListenerInvoker implements TransactionChainListener {
-
-        private final TransactionChainListener listener;
-
-        public ListenerInvoker(final TransactionChainListener listener) {
-            this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
-        }
+    private final class DelegateChainListener implements TransactionChainListener {
 
         @Override
         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-            Preconditions.checkState(delegate.equals(chain),
-                    "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
-            AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
-            listener.onTransactionChainFailed(chain, bindingTx, cause);
+            /*
+             * Intentionally NOOP, callback for failure, since we
+             * are also listening on each transaction for failure.
+             *
+             * by listening on submit future for Binding transaction
+             * in order to provide Binding transaction (which was seen by client
+             * of this transaction chain, instead of
+            */
         }
 
         @Override
index b8980cd0bea46b22676ccec1cb38def91239d275..a3619ec4d230463edcbf7e15957b230db9d0cb09 100644 (file)
       </dependency>
   </dependencies>
   <build>
+
       <plugins>
           <plugin>
               <groupId>org.jacoco</groupId>
                   </execution>
               </executions>
           </plugin>
-      </plugins>
+          <plugin>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <extensions>true</extensions>
+            <configuration>
+            <instructions>
+                <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                <Export-Package>org.opendaylight.controller.cluster.*,org.opendaylight.common.actor,org.opendaylight.common.reporting,org.opendaylight.controller.protobuff.*,org.opendaylight.controller.xml.*</Export-Package>
+                <Import-Package>*</Import-Package>
+            </instructions>
+            </configuration>
+          </plugin>
+    </plugins>
   </build>
-
 </project>
index 31b658d1d7ea6f618f8277e4c5d214124d4613d6..91c0b5caa1ebd0854f44943164d9f9a89aec855d 100644 (file)
                   <type>xml</type>
                   <classifier>config</classifier>
                 </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/akka.conf</file>
+                  <type>xml</type>
+                  <classifier>akkaconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/module-shards.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleshardconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/modules.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleconf</classifier>
+                </artifact>
               </artifacts>
             </configuration>
           </execution>
index 05322137aafc2d62b6b18e548a419be91446aeeb..5a2116b50f92070687736329e95441a246ed49b0 100644 (file)
@@ -21,7 +21,7 @@ odl-cluster-data {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2550
         maximum-frame-size = 419430400
         send-buffer-size = 52428800
@@ -30,9 +30,14 @@ odl-cluster-data {
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-data@<CHANGE_SEED_IP>:2550"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
       auto-down-unreachable-after = 10s
+
+      roles = [
+        "member-1"
+      ]
+
     }
   }
 }
@@ -51,13 +56,13 @@ odl-cluster-rpc {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2551
       }
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@<CHANGE_SEED_IP>:2551"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
 
       auto-down-unreachable-after = 10s
     }
index dd5a7f297908ae00288c788bfe4bd63d8fb169c9..82998226b68e3049093a8be77f96227824db123e 100644 (file)
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+    </dependency>
+
     <!-- SAL Dependencies -->
 
     <dependency>
             <Export-package></Export-package>
             <Private-Package></Private-Package>
             <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
+            <!--
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
                 *scala*;
             </Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
+            -->
           </instructions>
         </configuration>
       </plugin>
index 15c0548761ecd96afee865025fc573d5a2d38c6e..b326d61fc62211f00d03d636ed635f7e959cf09d 100644 (file)
@@ -10,24 +10,55 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import com.google.common.base.Function;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.osgi.framework.BundleContext;
 
-import javax.annotation.Nullable;
+import java.io.File;
 
 public class ActorSystemFactory {
-    private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
-
-        @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
-                ActorSystem system =
-                    ActorSystem.create("opendaylight-cluster-data", ConfigFactory
-                        .load().getConfig("odl-cluster-data"));
-                system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-                return system;
-        }
-    }).apply(null);
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    public static final String CONFIGURATION_NAME = "odl-cluster-data";
+
+    private static volatile ActorSystem actorSystem = null;
 
     public static final ActorSystem getInstance(){
         return actorSystem;
     }
+
+    /**
+     * This method should be called only once during initialization
+     *
+     * @param bundleContext
+     */
+    public static final ActorSystem createInstance(final BundleContext bundleContext) {
+        if(actorSystem == null) {
+            // Create an OSGi bundle classloader for actor system
+            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+            synchronized (ActorSystemFactory.class) {
+                // Double check
+
+                if (actorSystem == null) {
+                    ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                        ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+                    system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+                    actorSystem = system;
+                }
+            }
+        }
+
+        return actorSystem;
+    }
+
+
+    private static final Config readAkkaConfiguration(){
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+    }
 }
index 202ced9a26512047f57ee8ad2b0ced7e61366771..0a137e07df43a1bb7ca2fb3e854d7f63adfd46a3 100644 (file)
@@ -72,6 +72,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                     actorSystem, actorSystem.actorOf(
                         ShardManager.props(type, cluster, configuration, datastoreContext).
                             withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
@@ -98,8 +100,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeLocalShardOperation(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-            ActorContext.ASK_DURATION);
+            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
 
         if (result != null) {
             RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
index 65a39a60e6c0819fe7f14543c268997a53fddff0..72b593f0106676a28bd251837f2566600d9359f1 100644 (file)
@@ -12,12 +12,13 @@ import akka.actor.ActorSystem;
 
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.osgi.framework.BundleContext;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DistributedDataStoreProperties dataStoreProperties) {
+            DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = ActorSystemFactory.getInstance();
+        ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
             new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
index eb6a5361381942306a9be7869d1048801f7c9a0f..df3245ffb225d9d3b0baf704e81e499ebdced314 100644 (file)
@@ -18,21 +18,24 @@ public class DistributedDataStoreProperties {
     private final int maxShardDataChangeExecutorQueueSize;
     private final int maxShardDataChangeExecutorPoolSize;
     private final int shardTransactionIdleTimeoutInMinutes;
+    private final int operationTimeoutInSeconds;
 
     public DistributedDataStoreProperties() {
         maxShardDataChangeListenerQueueSize = 1000;
         maxShardDataChangeExecutorQueueSize = 1000;
         maxShardDataChangeExecutorPoolSize = 20;
         shardTransactionIdleTimeoutInMinutes = 10;
+        operationTimeoutInSeconds = 5;
     }
 
     public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
             int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes) {
+            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
         this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
         this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
         this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
         this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public int getMaxShardDataChangeListenerQueueSize() {
@@ -50,4 +53,8 @@ public class DistributedDataStoreProperties {
     public int getShardTransactionIdleTimeoutInMinutes() {
         return shardTransactionIdleTimeoutInMinutes;
     }
+
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index 43a9faa3e44e5d6fe77db7cce74929f8888dcadb..6a6a181b6c03ac744d02a3e8e815011d2cf99c3f 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 
 import com.google.common.base.Optional;
@@ -21,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -144,8 +145,19 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
     }
 
+    @Override public void onReceiveRecover(Object message) {
+        LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+            getSender());
+
+        if (message instanceof RecoveryFailure){
+            LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+        } else {
+            super.onReceiveRecover(message);
+        }
+    }
+
     @Override public void onReceiveCommand(Object message) {
-        LOG.debug("Received message {} from {}", message.getClass().toString(),
+        LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
             getSender());
 
         if (message.getClass()
@@ -228,7 +240,8 @@ public class Shard extends RaftActor {
             .tell(new CreateTransactionReply(
                     Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId()).toSerializable(),
-                getSelf());
+                getSelf()
+            );
     }
 
     private void commit(final ActorRef sender, Object serialized) {
@@ -266,9 +279,9 @@ public class Shard extends RaftActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-               sender.tell(new CommitTransactionReply().toSerializable(),self);
-               shardMBean.incrementCommittedTransactionCount();
-               shardMBean.setLastCommittedTransactionTime(new Date());
+                sender.tell(new CommitTransactionReply().toSerializable(), self);
+                shardMBean.incrementCommittedTransactionCount();
+                shardMBean.setLastCommittedTransactionTime(new Date());
             }
 
             @Override
@@ -365,7 +378,6 @@ public class Shard extends RaftActor {
                     identifier, clientActor.path().toString());
             }
 
-
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -383,11 +395,11 @@ public class Shard extends RaftActor {
 
     }
 
-    @Override protected Object createSnapshot() {
+    @Override protected void createSnapshot() {
         throw new UnsupportedOperationException("createSnapshot");
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         throw new UnsupportedOperationException("applySnapshot");
     }
 
index c557118b1e8f92234d597ce6372cda2674f6a6bd..a5be69531d73ede89f7bba5978a85d2e045d8989 100644 (file)
@@ -151,8 +151,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
-                    ActorContext.ASK_DURATION));
+            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
index fc1a3aad74ca95ca0545f319303c50a50cd994df..a8b20c030e1dd34f274ce2e02b56669739824af3 100644 (file)
@@ -353,8 +353,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
-                ActorContext.ASK_DURATION);
+                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -472,7 +471,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Send the ReadyTransaction message to the Tx actor.
 
             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadyTransaction().toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -532,23 +531,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+                    new DeleteData(path).toSerializable() ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new MergeData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new WriteData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
@@ -634,7 +631,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadData(path).toSerializable());
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
@@ -715,7 +712,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
-                    new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new DataExists(path).toSerializable());
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
     }
index 818a8ca8b390ec2fcb33f2cad91ad597fa7ca758..b87dc4f608b191c21d5fa34b8bf4a9744864f96a 100644 (file)
@@ -47,10 +47,7 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
+    private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
 
     public static final String MAILBOX = "bounded-mailbox";
 
@@ -59,6 +56,8 @@ public class ActorContext {
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private volatile SchemaContext schemaContext;
+    private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+    private Timeout operationTimeout = new Timeout(operationDuration);
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -93,6 +92,11 @@ public class ActorContext {
         }
     }
 
+    public void setOperationTimeout(int timeoutInSeconds) {
+        operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+    }
+
     public SchemaContext getSchemaContext() {
         return schemaContext;
     }
@@ -117,7 +121,7 @@ public class ActorContext {
      */
     public ActorRef findLocalShard(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindLocalShard(shardName), ASK_DURATION);
+            new FindLocalShard(shardName));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -133,7 +137,7 @@ public class ActorContext {
 
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
@@ -151,16 +155,13 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return The response of the operation
      */
-    public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration) {
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+    public Object executeLocalOperation(ActorRef actor, Object message) {
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
@@ -171,21 +172,19 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return
      */
-    public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration) {
+    public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() +
+                    " to actor " + actor.toString() + " failed" , e);
         }
     }
 
@@ -194,15 +193,13 @@ public class ActorContext {
      *
      * @param actor the ActorSelection
      * @param message the message to send
-     * @param duration the maximum amount of time to send he message
      * @return a Future containing the eventual result
      */
-    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
-            FiniteDuration duration) {
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        return ask(actor, message, new Timeout(duration));
+        return ask(actor, message, operationTimeout);
     }
 
     /**
@@ -225,16 +222,14 @@ public class ActorContext {
      *
      * @param shardName
      * @param message
-     * @param duration
      * @return
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeShardOperation(String shardName, Object message) {
         ActorSelection primary = findPrimary(shardName);
 
-        return executeRemoteOperation(primary, message, duration);
+        return executeRemoteOperation(primary, message);
     }
 
     /**
@@ -246,19 +241,17 @@ public class ActorContext {
      *
      * @param shardName the name of the shard on which the operation needs to be executed
      * @param message the message that needs to be sent to the shard
-     * @param duration the time duration in which this operation should complete
      * @return the message that was returned by the local actor on which the
      *         the operation was executed. If a local shard was not found then
      *         null is returned
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
      *         if the operation does not complete in a specified time duration
      */
-    public Object executeLocalShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeLocalShardOperation(String shardName, Object message) {
         ActorRef local = findLocalShard(shardName);
 
         if(local != null) {
-            return executeLocalOperation(local, message, duration);
+            return executeLocalOperation(local, message);
         }
 
         return null;
index f5a0d3783ab011728ab0688db60b404a9c53719e..e2fbacb46169047764d24f8ba09f087693773d84 100644 (file)
@@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.osgi.framework.BundleContext;
 
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
+    private BundleContext bundleContext;
+
     public DistributedConfigDataStoreProviderModule(
         org.opendaylight.controller.config.api.ModuleIdentifier identifier,
         org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@@ -33,9 +36,15 @@ public class DistributedConfigDataStoreProviderModule extends
         }
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
     }
 }
index 67bf5994545af4e8d64b95cf40352bbb908d68a5..0cdaca3a15262883c75ee31c71a121dc0eec0607 100644 (file)
@@ -8,6 +8,29 @@
 * Do not modify this file unless it is present under src/main directory
 */
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
 public class DistributedConfigDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModuleFactory {
 
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+        DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+        DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+        DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
+            old, bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+
 }
index 443334d11f65378871da7a6f2ae97097d4c2eb87..c185e871ea46cd91f1176a43eda52e9b464d8bd2 100644 (file)
@@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.osgi.framework.BundleContext;
 
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
+    private BundleContext bundleContext;
+
     public DistributedOperationalDataStoreProviderModule(
         org.opendaylight.controller.config.api.ModuleIdentifier identifier,
         org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@@ -34,10 +37,16 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
     }
 
 }
index c9965fee09029ee73c6b85687f8f659301680e97..364fe629234612ea58d6c8fb2a9bc7c78ee3e20c 100644 (file)
@@ -8,6 +8,27 @@
 * Do not modify this file unless it is present under src/main directory
 */
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
 public class DistributedOperationalDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModuleFactory {
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+        DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+        DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+        DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
+            old, bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
 
 }
index a9a8a1ad986c65835018f80773707561473934b8..d50be2ca0ef8fdc8123e5b63a62887034bed0bb1 100644 (file)
@@ -36,30 +36,48 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
+    typedef non-zero-uint16-type {
+        type uint16 {
+            range "1..max";
+        }
+    }
+    
+    typedef operation-timeout-type {
+        type uint16 {
+            range "5..max";
+        }
+    }
+    
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
          
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
+         
+         leaf operation-timeout-in-seconds {
+            default 5;
+            type operation-timeout-type;
+            description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+         }
     }
     
     // Augments the 'configuration' choice node under modules/module.
index e653c3d3717351182a6a57cf570c4a6bf6449500..2ed11cfbda21eff65b3cb6223b671731c891ab67 100644 (file)
@@ -82,8 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index c99a7e8c8c4908133bac2a9710e3549f02e2410c..3d0aaa0082e55e8b73976af4637977dd9111c97d 100644 (file)
@@ -62,8 +62,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index e10570cd158bdfc4a1d98132788cd89f47cddc15..e39b9abd65a711e333f9c0315b25de2e3457266a 100644 (file)
@@ -13,6 +13,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -185,17 +186,17 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
             ).build();
 
+        Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
         //This is done so that Modification list is updated which is used during commit
-        Future future =
-            akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+        Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
 
         //ready transaction creates the cohort so that we get into the
         //block where in commmit is done
         ShardTransactionMessages.ReadyTransaction readyTransaction =
             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
-        future =
-            akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+        future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
 
         //but when the message is sent it will have the MockCommit object
         //so that we can simulate throwing of exception
@@ -216,10 +217,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         when(mockModification.toSerializable()).thenReturn(
             PersistentMessages.CompositeModification.newBuilder().build());
 
-        future =
-            akka.pattern.Patterns.ask(subject,
-                mockForwardCommitTransaction
-                , 3000);
+        future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
         Await.result(future, ASK_RESULT_DURATION);
     }
 
index adb12b298e99260b6f33a6c309f3c6eb16ca78bb..1cd0f85fa1917057f77144f23009a7edb65c0150 100644 (file)
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 
 import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -93,12 +92,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(FiniteDuration.class));
+                isA(requestType));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+                any(ActorSelection.class), isA(requestType));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
index f69ae88ec873ed044ab40c36a63e6f0b68b9db67..e5392e025158704f44d152306aaa727b64d460e8 100644 (file)
@@ -56,8 +56,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -216,10 +214,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private FiniteDuration anyDuration() {
-        return any(FiniteDuration.class);
-    }
-
     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
@@ -232,7 +226,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 when(mockActorContext).actorSelection(actorRef.path().toString());
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
-                        eqCreateTransaction(memberName, type), anyDuration());
+                        eqCreateTransaction(memberName, type));
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
                 anyString(), eq(actorRef.path().toString()));
         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
@@ -259,7 +253,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -269,7 +263,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -283,7 +277,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -296,7 +290,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -308,7 +302,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             throws Throwable {
 
         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -348,14 +342,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -368,7 +361,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                    eq(actorSelection(actorRef)), eqReadData());
         }
     }
 
@@ -379,10 +372,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(expectedNode));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -414,14 +407,14 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -443,7 +436,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -456,7 +449,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -471,14 +464,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -491,7 +483,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                    eq(actorSelection(actorRef)), eqDataExists());
         }
     }
 
@@ -502,10 +494,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -556,7 +548,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -564,7 +556,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 WriteDataReply.SERIALIZABLE_CLASS);
@@ -599,7 +591,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -607,7 +599,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 MergeDataReply.SERIALIZABLE_CLASS);
@@ -618,7 +610,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -626,7 +618,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.delete(TestModel.TEST_PATH);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 DeleteDataReply.SERIALIZABLE_CLASS);
@@ -665,13 +657,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -700,14 +692,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -736,11 +727,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -763,7 +754,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
 
         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -793,11 +784,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -830,7 +821,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
index fda9ccdfdbbd44dda363c92b842d246e389466bb..5d8fb8393d6c4fd773a0a94b6fd156ac02fe1c14 100644 (file)
@@ -117,7 +117,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertEquals("hello", out);
 
@@ -144,7 +144,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertNull(out);
 
@@ -232,7 +232,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+                    Object out = actorContext.executeRemoteOperation(actor, "hello");
 
                     assertEquals("hello", out);
 
@@ -261,8 +261,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
-                            Duration.create(3, TimeUnit.SECONDS));
+                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
 
                     try {
                         Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
index b19fd3a5290b2c8df92917403d7a501bcf79bfcf..8fa3a17f901541f79be1ec45a373ebe097ba69a1 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
 
 public class MockActorContext extends ActorContext {
 
@@ -33,12 +32,12 @@ public class MockActorContext extends ActorContext {
 
 
     @Override public Object executeShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeShardOperationResponse;
     }
 
     @Override public Object executeRemoteOperation(ActorSelection actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeRemoteOperationResponse;
     }
 
@@ -76,13 +75,13 @@ public class MockActorContext extends ActorContext {
 
     @Override
     public Object executeLocalOperation(ActorRef actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalOperationResponse;
     }
 
     @Override
     public Object executeLocalShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalShardOperationResponse;
     }
 }
index 939096e7f306c7c6d3f0b131fa0880f6374fa274..4ddba2f1b9d773b3c4783d3e401fa7df43d32a0e 100644 (file)
@@ -21,8 +21,7 @@ public class TestUtils {
         ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
             Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index 41cdd59d6b3d5414d4d6e002262e7d71c9106563..8d454c4bd6e43ef54aa8c8a24f659e6be1244f68 100644 (file)
             <Export-package></Export-package>
             <Private-Package></Private-Package>
             <Import-Package>!org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;!*jetty*;!sun.security.*;*</Import-Package>
+            <!--
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
                 *uncommons*;
             </Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
+          -->
           </instructions>
         </configuration>
       </plugin>
index f1ca3ccd505e1b1732ad906f75f3662368fdc4a2..6a442c57cc5dfac799760c74d11dedff9668c833 100644 (file)
@@ -10,12 +10,16 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorSystem;
 import akka.osgi.BundleDelegatingClassLoader;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
 import org.osgi.framework.BundleContext;
 
 
 public class ActorSystemFactory {
- private static volatile ActorSystem actorSystem = null;
+
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
+    public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
+
+    private static volatile ActorSystem actorSystem = null;
 
   public static final ActorSystem getInstance(){
      return actorSystem;
@@ -26,7 +30,7 @@ public class ActorSystemFactory {
    *
    * @param bundleContext
    */
-  public static final void createInstance(final BundleContext bundleContext) {
+  public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
     if(actorSystem == null) {
       // Create an OSGi bundle classloader for actor system
       BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
@@ -34,8 +38,8 @@ public class ActorSystemFactory {
       synchronized (ActorSystemFactory.class) {
         // Double check
         if (actorSystem == null) {
-          ActorSystem system = ActorSystem.create("opendaylight-cluster-rpc",
-              ConfigFactory.load().getConfig("odl-cluster-rpc"), classLoader);
+          ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+              akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
           actorSystem = system;
         }
       }
@@ -43,4 +47,5 @@ public class ActorSystemFactory {
       throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
     }
   }
+
 }
index fc75f7747a0d25361c96f96e77aac30538bb05a5..0e6b795c058877069640a848fe1144575db37443 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.remote.rpc;
 
 
+import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
@@ -16,7 +17,7 @@ import org.osgi.framework.BundleContext;
 public class RemoteRpcProviderFactory {
     public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
 
-      ActorSystemFactory.createInstance(bundleContext);
+      ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader());
       RemoteRpcProvider rpcProvider =
           new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
       broker.registerProvider(rpcProvider);
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java
new file mode 100644 (file)
index 0000000..035ce9a
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.utils;
+
+import com.typesafe.config.Config;
+
+public interface AkkaConfigurationReader {
+    Config read();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java
new file mode 100644 (file)
index 0000000..a44d20c
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.utils;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.File;
+
+public class DefaultAkkaConfigurationReader implements AkkaConfigurationReader {
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+
+    @Override public Config read() {
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+
+    }
+}
index ed5fa6d16e330bbf8907aad6a0341b5d543c1952..cd1cd918693c15a2f8343ed46e8fba97b50c7a8a 100644 (file)
@@ -10,9 +10,11 @@ package org.opendaylight.controller.remote.rpc;
 
 
 import akka.actor.ActorSystem;
+import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 
@@ -27,13 +29,17 @@ public class ActorSystemFactoryTest {
   public void testActorSystemCreation(){
     BundleContext context = mock(BundleContext.class);
     when(context.getBundle()).thenReturn(mock(Bundle.class));
-    ActorSystemFactory.createInstance(context);
+
+    AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class);
+    when(reader.read()).thenReturn(ConfigFactory.load());
+
+    ActorSystemFactory.createInstance(context, reader);
     system = ActorSystemFactory.getInstance();
     Assert.assertNotNull(system);
     // Check illegal state exception
 
     try {
-      ActorSystemFactory.createInstance(context);
+      ActorSystemFactory.createInstance(context, reader);
       fail("Illegal State exception should be thrown, while creating actor system second time");
     } catch (IllegalStateException e) {
     }
@@ -45,5 +51,4 @@ public class ActorSystemFactoryTest {
       system.shutdown();
     }
   }
-
 }
index 52115a8f32f5ca5c04a5fb0d7c994456cc114544..fe20e3a441590b14f658d025567102977dd97d51 100644 (file)
@@ -1,10 +1,12 @@
 package org.opendaylight.controller.config.yang.md.sal.rest.connector;
 
-import org.opendaylight.controller.sal.rest.impl.RestconfProviderImpl;
+import org.opendaylight.controller.sal.restconf.impl.RestconfProviderImpl;
 
 
 public class RestConnectorModule extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModule {
 
+    private static RestConnectorRuntimeRegistration runtimeRegistration;
+
     public RestConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
@@ -26,6 +28,14 @@ public class RestConnectorModule extends org.opendaylight.controller.config.yang
         instance.setWebsocketPort(getWebsocketPort());
         // Register it with the Broker
         getDomBrokerDependency().registerProvider(instance);
+
+        if(runtimeRegistration != null){
+            runtimeRegistration.close();
+        }
+
+        runtimeRegistration =
+            getRootRuntimeBeanRegistratorWrapper().register(instance);
+
         return instance;
     }
 }
index a298f4b0937ae71c146c9baada15a14535b7de9d..c9496af4c86d4838b6ee570e26b2b84ac8c34e26 100644 (file)
@@ -14,6 +14,7 @@ import javax.ws.rs.core.Application;
 import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
 import org.opendaylight.controller.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.controller.sal.restconf.impl.StatisticsRestconfServiceWrapper;
 
 public class RestconfApplication extends Application {
 
@@ -38,7 +39,7 @@ public class RestconfApplication extends Application {
         restconfImpl.setControllerContext(controllerContext);
         singletons.add(controllerContext);
         singletons.add(brokerFacade);
-        singletons.add(restconfImpl);
+        singletons.add(StatisticsRestconfServiceWrapper.getInstance());
         singletons.add(StructuredDataToXmlProvider.INSTANCE);
         singletons.add(StructuredDataToJsonProvider.INSTANCE);
         singletons.add(JsonToCompositeNodeProvider.INSTANCE);
index adad26e141d2315af5287c9ff2b49a02f16f68ac..5d8c910afc31fa9d6420fc6d3a67466c34924317 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.math.BigInteger;
 import java.net.URI;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -85,6 +86,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RestconfImpl implements RestconfService {
+
     private enum UriParameters {
         PRETTY_PRINT("prettyPrint"),
         DEPTH("depth");
@@ -101,6 +103,8 @@ public class RestconfImpl implements RestconfService {
         }
     }
 
+
+
     private final static RestconfImpl INSTANCE = new RestconfImpl();
 
     private static final int NOTIFICATION_PORT = 8181;
@@ -1550,4 +1554,9 @@ public class RestconfImpl implements RestconfService {
         }
         return false;
     }
+
+    public BigInteger getOperationalReceived() {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }
@@ -5,35 +5,41 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.rest.impl;
+package org.opendaylight.controller.sal.restconf.impl;
 
-import java.util.Collection;
-import java.util.Collections;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Put;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnectorRuntimeMXBean;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.controller.sal.rest.api.RestConnector;
-import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
-import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
 import org.opendaylight.controller.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 
-public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector {
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
 
-    public final static String NOT_INITALIZED_MSG = "Restconf is not initialized yet. Please try again later";
+public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector, RestConnectorRuntimeMXBean {
 
+    private final StatisticsRestconfServiceWrapper stats = StatisticsRestconfServiceWrapper.getInstance();
     private ListenerRegistration<SchemaContextListener> listenerRegistration;
     private PortNumber port;
+    private Thread webSocketServerThread;
+
     public void setWebsocketPort(PortNumber port) {
         this.port = port;
     }
 
-    private Thread webSocketServerThread;
-
     @Override
     public void onSessionInitiated(ProviderSession session) {
         final DOMDataBroker domDataBroker = session.getService(DOMDataBroker.class);
@@ -58,9 +64,45 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
 
     @Override
     public void close() {
+
         if (listenerRegistration != null) {
             listenerRegistration.close();
         }
+
+        WebSocketServer.destroyInstance();
         webSocketServerThread.interrupt();
     }
+
+    @Override
+    public Config getConfig() {
+        Config config = new Config();
+        Get get = new Get();
+        get.setReceivedRequests(stats.getConfigGet());
+        config.setGet(get);
+        Post post = new Post();
+        post.setReceivedRequests(stats.getConfigPost());
+        config.setPost(post);
+        Put put = new Put();
+        put.setReceivedRequests(stats.getConfigPut());
+        config.setPut(put);
+        return config;
+    }
+
+    @Override
+    public Operational getOperational() {
+        BigInteger opGet = stats.getOperationalGet();
+        Operational operational = new Operational();
+        Get get = new Get();
+        get.setReceivedRequests(opGet);
+        operational.setGet(get);
+        return operational;
+    }
+
+    @Override
+    public Rpcs getRpcs() {
+        BigInteger rpcInvoke = stats.getRpc();
+        Rpcs rpcs = new Rpcs();
+        rpcs.setReceivedRequests(rpcInvoke);
+        return rpcs ;
+    }
 }
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java
new file mode 100644 (file)
index 0000000..eafbb81
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.sal.rest.api.RestconfService;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+
+public class StatisticsRestconfServiceWrapper implements RestconfService {
+
+    AtomicLong operationalGet = new AtomicLong();
+    AtomicLong configGet = new AtomicLong();
+    AtomicLong rpc = new AtomicLong();
+    AtomicLong configPost = new AtomicLong();
+    AtomicLong configPut = new AtomicLong();
+    AtomicLong configDelete = new AtomicLong();
+
+    private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
+
+    final RestconfService delegate;
+
+    private StatisticsRestconfServiceWrapper(RestconfService delegate) {
+        this.delegate = delegate;
+    }
+
+    public static StatisticsRestconfServiceWrapper getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public Object getRoot() {
+        return delegate.getRoot();
+    }
+
+    @Override
+    public StructuredData getModules(UriInfo uriInfo) {
+        return delegate.getModules(uriInfo);
+    }
+
+    @Override
+    public StructuredData getModules(String identifier, UriInfo uriInfo) {
+        return delegate.getModules(identifier, uriInfo);
+    }
+
+    @Override
+    public StructuredData getModule(String identifier, UriInfo uriInfo) {
+        return delegate.getModule(identifier, uriInfo);
+    }
+
+    @Override
+    public StructuredData getOperations(UriInfo uriInfo) {
+        return delegate.getOperations(uriInfo);
+    }
+
+    @Override
+    public StructuredData getOperations(String identifier, UriInfo uriInfo) {
+        return delegate.getOperations(identifier, uriInfo);
+    }
+
+    @Override
+    public StructuredData invokeRpc(String identifier, CompositeNode payload, UriInfo uriInfo) {
+        rpc.incrementAndGet();
+        return delegate.invokeRpc(identifier, payload, uriInfo);
+    }
+
+    @Override
+    public StructuredData invokeRpc(String identifier, String noPayload, UriInfo uriInfo) {
+        rpc.incrementAndGet();
+        return delegate.invokeRpc(identifier, noPayload, uriInfo);
+    }
+
+    @Override
+    public NormalizedNodeContext readConfigurationData(String identifier, UriInfo uriInfo) {
+        configGet.incrementAndGet();
+        return delegate.readConfigurationData(identifier, uriInfo);
+    }
+
+    @Override
+    public NormalizedNodeContext readOperationalData(String identifier, UriInfo uriInfo) {
+        operationalGet.incrementAndGet();
+        return delegate.readOperationalData(identifier, uriInfo);
+    }
+
+    @Override
+    public Response updateConfigurationData(String identifier, Node<?> payload) {
+        configPut.incrementAndGet();
+        return delegate.updateConfigurationData(identifier, payload);
+    }
+
+    @Override
+    public Response createConfigurationData(String identifier, Node<?> payload) {
+        configPost.incrementAndGet();
+        return delegate.createConfigurationData(identifier, payload);
+    }
+
+    @Override
+    public Response createConfigurationData(Node<?> payload) {
+        configPost.incrementAndGet();
+        return delegate.createConfigurationData(payload);
+    }
+
+    @Override
+    public Response deleteConfigurationData(String identifier) {
+        return delegate.deleteConfigurationData(identifier);
+    }
+
+    @Override
+    public Response subscribeToStream(String identifier, UriInfo uriInfo) {
+        return delegate.subscribeToStream(identifier, uriInfo);
+    }
+
+    @Override
+    public StructuredData getAvailableStreams(UriInfo uriInfo) {
+        return delegate.getAvailableStreams(uriInfo);
+    }
+
+    public BigInteger getConfigDelete() {
+        return BigInteger.valueOf(configDelete.get());
+    }
+
+    public BigInteger getConfigGet() {
+        return BigInteger.valueOf(configGet.get());
+    }
+
+    public BigInteger getConfigPost() {
+        return BigInteger.valueOf(configPost.get());
+    }
+
+    public BigInteger getConfigPut() {
+        return BigInteger.valueOf(configPut.get());
+    }
+
+    public BigInteger getOperationalGet() {
+        return BigInteger.valueOf(operationalGet.get());
+    }
+
+    public BigInteger getRpc() {
+        return BigInteger.valueOf(rpc.get());
+    }
+
+}
index 67ed44f84ea86020b54553f3f0b4f108e4ada66f..0a5f5f0ff03be26086ede6a78d3ba2dd156e812c 100644 (file)
@@ -16,11 +16,10 @@ import org.slf4j.LoggerFactory;
 public class WebSocketServer implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
-    public static final String WEBSOCKET_SERVER_CONFIG_PROPERTY = "restconf.websocket.port";
     public static final int DEFAULT_PORT = 8181;
     private EventLoopGroup bossGroup;
     private EventLoopGroup workerGroup;
-    private static WebSocketServer singleton = null;
+    private static WebSocketServer instance = null;
     private int port = DEFAULT_PORT;
 
     private WebSocketServer(int port) {
@@ -35,14 +34,11 @@ public class WebSocketServer implements Runnable {
      * @return instance of {@link WebSocketServer}
      */
     public static WebSocketServer createInstance(int port) {
-        if (singleton != null) {
-            throw new IllegalStateException("createInstance() has already been called");
-        }
-        if (port < 1024) {
-            throw new IllegalArgumentException("Privileged port (below 1024) is not allowed");
-        }
-        singleton = new WebSocketServer(port);
-        return singleton;
+        Preconditions.checkState(instance == null, "createInstance() has already been called");
+        Preconditions.checkArgument(port > 1024, "Privileged port (below 1024) is not allowed");
+
+        instance = new WebSocketServer(port);
+        return instance;
     }
 
     /**
@@ -58,18 +54,18 @@ public class WebSocketServer implements Runnable {
      * @return instance of {@link WebSocketServer}
      */
     public static WebSocketServer getInstance() {
-        Preconditions.checkNotNull(singleton, "createInstance() must be called prior to getInstance()");
-        return singleton;
+        Preconditions.checkNotNull(instance, "createInstance() must be called prior to getInstance()");
+        return instance;
     }
 
     /**
      * Destroy this already created instance
      */
     public static void destroyInstance() {
-        if (singleton == null) {
-            throw new IllegalStateException("createInstance() must be called prior to destroyInstance()");
-        }
-        getInstance().stop();
+        Preconditions.checkState(instance != null, "createInstance() must be called prior to destroyInstance()");
+
+        instance.stop();
+        instance = null;
     }
 
     @Override
@@ -99,9 +95,11 @@ public class WebSocketServer implements Runnable {
         Notificator.removeAllListeners();
         if (bossGroup != null) {
             bossGroup.shutdownGracefully();
+            bossGroup = null;
         }
         if (workerGroup != null) {
             workerGroup.shutdownGracefully();
+            workerGroup = null;
         }
     }
 
index a8fc8ff4d54b6e149e562570b38a69ab01afcfba..6d2add6ff14313b8824a5ade4d8cba187260bc24 100644 (file)
@@ -27,6 +27,12 @@ module opendaylight-rest-connector {
         config:java-name-prefix RestConnector;
     }
     
+    grouping statistics {
+        leaf received-requests {
+           type uint64;
+        }
+    }
+
     augment "/config:modules/config:module/config:configuration" {
         case rest-connector-impl {
             when "/config:modules/config:module/config:type = 'rest-connector-impl'";
@@ -44,4 +50,33 @@ module opendaylight-rest-connector {
             }
         }
     }
+    
+    augment "/config:modules/config:module/config:state" {
+        case rest-connector-impl {
+            when "/config:modules/config:module/config:type = 'rest-connector-impl'";
+            container rpcs {
+                uses statistics;
+            }
+
+            container config {
+                container get {
+                    uses statistics;
+                }
+                
+                container post {
+                    uses statistics;
+                }
+                
+                container put {
+                    uses statistics;
+                }
+            }
+
+            container operational {
+                container get {
+                    uses statistics;
+                }
+            }
+        }
+    }
 }
\ No newline at end of file