Merge "BUG 1623 - Clustering : Parsing Error thrown on startup"
authorEd Warnicke <eaw@cisco.com>
Thu, 4 Sep 2014 22:53:58 +0000 (22:53 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 4 Sep 2014 22:53:58 +0000 (22:53 +0000)
89 files changed:
features/akka/pom.xml [new file with mode: 0644]
features/akka/src/main/resources/features.xml [new file with mode: 0644]
features/base/pom.xml
features/base/src/main/resources/features.xml
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight/pom.xml
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/persistent/PersistentMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Persistent.proto
opendaylight/md-sal/sal-clustering-config/pom.xml
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java [moved from opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java with 59% similarity]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java
opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang

diff --git a/features/akka/pom.xml b/features/akka/pom.xml
new file mode 100644 (file)
index 0000000..f1f3017
--- /dev/null
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Necessary TODO: Put your copyright here.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.2-SNAPSHOT</version>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+   </parent>
+   <artifactId>features-akka</artifactId>
+   <groupId>org.opendaylight.controller</groupId>
+   <packaging>jar</packaging>
+   <properties>
+      <features.file>features.xml</features.file>
+      <!-- Optional TODO: Move these properties to your parent pom and possibly
+            DependencyManagement section of your parent pom -->
+      <branding.version>1.0.0-SNAPSHOT</branding.version>
+      <karaf.resources.version>1.4.2-SNAPSHOT</karaf.resources.version>
+      <karaf.version>3.0.1</karaf.version>
+      <feature.test.version>0.6.2-SNAPSHOT</feature.test.version>
+      <karaf.empty.version>1.4.2-SNAPSHOT</karaf.empty.version>
+      <surefire.version>2.16</surefire.version>
+   </properties>
+   <dependencies>
+    <!--
+      Necessary TODO: Put dependencies on any feature repos
+      you use in your features.xml file.
+
+      Note: they will need to be <type>xml</xml>
+      and <classifier>features</classifier>.
+      One other thing to watch for is to make sure they are
+      <scope>compile</compile>, which they should be by default,
+      but be cautious lest they be at a different scope in a parent pom.
+
+      Examples:
+        <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>features-yangtools</artifactId>
+          <version>0.6.2-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>features-mdsal</artifactId>
+          <version>1.1-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.openflowplugin</groupId>
+          <artifactId>features-openflowplugin</artifactId>
+          <version>0.0.3-SNAPSHOT</version>
+          <classifier>features</classifier>
+          <type>xml</type>
+        </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for bundles directly referenced
+      in your features.xml file.  For every <bundle> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Examples:
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-provider</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-model</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+    -->
+
+    <!--
+      Necessary TODO: Put dependencies for configfiles directly referenced
+      in your features.xml file.  For every <configfile> reference in your
+      features.xml file, you need a corresponding dependency here.
+
+      Example (presuming here version is coming from the parent pom):
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>controller-config</artifactId>
+        <version>${project.version}</version>
+        <type>xml</type>
+        <classifier>config</classifier>
+      </dependency>
+    -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}.${scala.micro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe</groupId>
+      <artifactId>config</artifactId>
+      <version>${typesafe.config.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.uncommons.maths</groupId>
+        <artifactId>uncommons-maths</artifactId>
+        <version>${uncommons.maths.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jcommon</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>jfree</groupId>
+                <artifactId>jfreechart</artifactId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.8.0.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-remote_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-cluster_${scala.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.iq80.leveldb</groupId>
+      <artifactId>leveldb</artifactId>
+      <version>${leveldb.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>${leveldbjni.version}</version>
+    </dependency>
+    <!--
+      Optional TODO: Remove TODO comments.
+    -->
+    <!-- test to validate features.xml -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-test</artifactId>
+      <version>${feature.test.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- dependency for opendaylight-karaf-empty for use by testing -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>opendaylight-karaf-empty</artifactId>
+      <version>${karaf.empty.version}</version>
+      <type>zip</type>
+    </dependency>
+    <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.2</version>
+    </dependency>
+    -->
+
+   </dependencies>
+   <build>
+      <resources>
+         <resource>
+            <directory>src/main/resources</directory>
+            <filtering>true</filtering>
+         </resource>
+      </resources>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>filter</id>
+                  <phase>generate-resources</phase>
+                  <goals>
+                     <goal>resources</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>attach-artifacts</id>
+                  <phase>package</phase>
+                  <goals>
+                     <goal>attach-artifact</goal>
+                  </goals>
+                  <configuration>
+                     <artifacts>
+                        <artifact>
+                           <file>${project.build.directory}/classes/${features.file}</file>
+                           <type>xml</type>
+                           <classifier>features</classifier>
+                        </artifact>
+                     </artifacts>
+                  </configuration>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>${surefire.version}</version>
+            <configuration>
+              <systemPropertyVariables>
+                <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+                <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+                <karaf.distro.version>${karaf.empty.version}</karaf.distro.version>
+              </systemPropertyVariables>
+              <dependenciesToScan>
+               <dependency>org.opendaylight.yangtools:features-test</dependency>
+              </dependenciesToScan>
+            </configuration>
+          </plugin>
+      </plugins>
+   </build>
+   <scm>
+      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+      <tag>HEAD</tag>
+      <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
+   </scm>
+</project>
diff --git a/features/akka/src/main/resources/features.xml b/features/akka/src/main/resources/features.xml
new file mode 100644 (file)
index 0000000..182ff76
--- /dev/null
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Necessary TODO: Put your copyright statement here
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<features name="odl-controller-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+    <!--
+        Necessary TODO: Please read the features guidelines:
+        https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Best_Practices
+    -->
+    <!--
+    Necessary TODO: Add repo entries for the repositories of features you refer to
+        in this feature file but do not define here.
+        Examples:
+            <repository>mvn:org.opendaylight.yangtools/features-yangtools/0.6.2-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.controller/features-mdsal/1.1-SNAPSHOT/xml/features</repository>
+            <repository>mvn:org.opendaylight.openflowplugin/features-openflowplugin/0.0.3-SNAPSHOT/xml/features</repository>
+    -->
+    <feature name='odl-akka-all' version='${project.version}' description='OpenDaylight :: Akka :: All'>
+        <!--
+            Necessary TODO:
+            List all of the user consumable features you define in this feature file here.
+            Generally you would *not* list individual bundles here, but only features defined in *this* file.
+            It is useful to list them in the same order they occur in the file.
+
+            Examples:
+            <feature version='${project.version}'>odl-controller-provider</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+        -->
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <feature version="${akka.version}">odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-persistence</feature>
+    </feature>
+    <!--
+        Necessary TODO: Define your features.  It is useful to list then in order of dependency.  So if A depends on B, list A first.
+        When naming your features please be mindful of the guidelines:
+            https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines
+        Particularly:
+            a) Prefixing names with 'odl-': https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Feature_Naming
+            b) Descriptions: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Description
+            c) Avoid start-levels: https://wiki.opendaylight.org/view/Runtime:Karaf_Features_Guidelines#Avoid_start-levels
+
+        It's also nice to list inside a feature, first the features it needs, then the bundles it needs, then the configfiles.
+        Examples:
+
+        * Basic MD-SAL Provider
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider '>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <feature version='${project.version}'>odl-controller-model</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Model feature
+        <feature name='odl-controller-model' version='${project.version}' description='OpenDaylight :: controller :: Model'>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-binding</feature>
+            <feature version='0.6.2-SNAPSHOT'>odl-yangtools-models</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-model/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+        * Config Subsystem example - the config file is your config subsystem configuration
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='1.1-SNAPSHOT'>odl-mdsal-broker</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            <configfile finalname="etc/opendaylight/karaf/80-controller.xml">mvn:org.opendaylight.controller/controller-config/${project.version}/xml/config</configfile>
+            ... whatever other bundles you need
+        </feature>
+
+        * Basic MD-SAL Provider that uses openflowplugin-flow-services (which brings along odl-mdsal-broker)
+        <feature name='odl-controller-provider' version='${project.version}' description='OpenDaylight :: controller :: Provider'>
+            <feature version='0.0.3-SNAPSHOT'>odl-openflowplugin-flow-services</feature>
+            <bundle>mvn:org.opendaylight.controller/controller-provider/${project.version}</bundle>
+            ... whatever other bundles you need
+        </feature>
+
+    -->
+    <feature name="odl-akka-scala" description="Scala Runtime for OpenDaylight" version="${scala.version}">
+        <bundle>mvn:org.scala-lang/scala-library/${scala.version}.${scala.micro.version}</bundle>
+        <bundle>mvn:org.scala-lang/scala-reflect/${scala.version}.${scala.micro.version}</bundle>
+    </feature>
+    <feature name="odl-akka-system" description="Akka Actor Framework System Bundles" version="${akka.version}">
+        <feature version="${scala.version}">odl-akka-scala</feature>
+        <bundle>mvn:com.typesafe/config/${typesafe.config.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name="odl-akka-clustering" description="Akka Clustering" version="${akka.version}">
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>wrap:mvn:org.uncommons.maths/uncommons-maths/${uncommons.maths.version}</bundle>
+        <bundle>mvn:com.google.protobuf/protobuf-java/${protobuf.version}</bundle>
+        <bundle>mvn:io.netty/netty/3.8.0.Final</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version}</bundle>
+        <bundle>mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version}</bundle>
+    </feature>
+    <feature name='odl-akka-leveldb' description='LevelDB' version='0.7'>
+        <bundle>wrap:mvn:org.iq80.leveldb/leveldb/${leveldb.version}</bundle>
+        <bundle>mvn:org.fusesource.leveldbjni/leveldbjni-all/${leveldbjni.version}</bundle>
+    </feature>
+    <feature name='odl-akka-persistence' description='Akka Persistence' version="${akka.version}">
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <feature version="${akka.version}">odl-akka-system</feature>
+        <bundle>mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version}</bundle>
+        <bundle>wrap:mvn:com.google.protobuf/protobuf-java/${protobuf.version}$overwrite=merge&amp;DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.*</bundle>
+    </feature>
+    <!-- Optional TODO: Remove TODO Comments -->
+
+</features>
index ed8e2a8..8fec90f 100644 (file)
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina</artifactId>
-      <version>7.0.53.v201406061610</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina.ha</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.catalina.tribes</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.coyote</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.el</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.jasper</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.juli.extras</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.tomcat.api</artifactId>
-      <version>7.0.53.v201406060720</version>
     </dependency>
     <dependency>
       <groupId>orbit</groupId>
       <artifactId>org.apache.tomcat.util</artifactId>
-      <version>7.0.53.v201406070630</version>
     </dependency>
     <dependency>
       <groupId>org.aopalliance</groupId>
index 999cf70..e4c455c 100644 (file)
    <feature name="odl-base-tomcat" description="OpenDaylight Tomcat" version="7.0.53">
       <feature>odl-base-gemini-web</feature>
       <feature>odl-base-eclipselink-persistence</feature>
-      <bundle start="true">mvn:orbit/org.apache.catalina/${commons.karaf.catalina}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina/${commons.catalina}</bundle>
       <bundle start="true">mvn:geminiweb/org.eclipse.gemini.web.tomcat/${geminiweb.version}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.karaf.catalina.ha}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.karaf.catalina.tribes}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.coyote/${commons.karaf.coyote}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.el/${commons.karaf.el}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.jasper/${commons.karaf.jasper}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.karaf.juli.version}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.karaf.tomcat.api}</bundle>
-      <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.karaf.tomcat.util}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.catalina.ha}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.catalina.tribes}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.coyote/${commons.coyote}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.el/${commons.el}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.jasper/${commons.jasper}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.juli.version}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.tomcat.api}</bundle>
+      <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.tomcat.util}</bundle>
       <bundle start="true" >mvn:org.opendaylight.controller/karaf-tomcat-security/${karaf.security.version}</bundle>
       <bundle start="true">wrap:mvn:virgomirror/org.eclipse.jdt.core.compiler.batch/${eclipse.jdt.core.compiler.batch.version}</bundle>
    </feature>
index c6856c8..38fe92f 100644 (file)
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-akka</artifactId>
+      <version>${commons.opendaylight.version}</version>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-api</artifactId>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-distributed-datastore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-remoterpc-connector</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-commons</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-akka-raft</artifactId>
+      <version>${mdsal.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-netconf-connector</artifactId>
index 408be62..ae73c71 100644 (file)
@@ -7,11 +7,13 @@
     <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-persister/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-netty/${config.version}/xml/features</repository>
+    <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
         <feature version='${project.version}'>odl-restconf</feature>
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
     <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <bundle>mvn:com.sun.jersey/jersey-servlet/${jersey.version}</bundle>
         <bundle>wrap:mvn:org.json/json/${org.json.version}</bundle>
     </feature>
+    <feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${akka.version}'>odl-akka-system</feature>
+        <feature version='${akka.version}'>odl-akka-persistence</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/sal-akka-raft/${project.version}</bundle>
+        <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
+    </feature>
+    <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
+        <feature version='0.7'>odl-akka-leveldb</feature>
+        <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
+    </feature>
+    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+        <feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
+        <feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
+        <configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
+        <configfile finalname="configuration/initial/akka.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf</configfile>
+        <configfile finalname="configuration/initial/module-shards.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf</configfile>
+        <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
+    </feature>
 </features>
index 039060b..01156cf 100644 (file)
@@ -26,5 +26,6 @@
     <module>netconf</module>
     <module>protocol-framework</module>
     <module>adsal-compatibility</module>
+    <module>akka</module>
   </modules>
 </project>
\ No newline at end of file
index 2e95307..b05170d 100644 (file)
     <commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
     <!-- Third Party Versions -->
     <codahale.metrics.version>3.0.1</codahale.metrics.version>
-    <commons.catalina>7.0.32.v201211201336</commons.catalina>
-    <commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
-    <commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
-    <commons.coyote>7.0.32.v201211201952</commons.coyote>
-    <commons.el>7.0.32.v201211081135</commons.el>
-    <commons.jasper>7.0.32.v201211201952</commons.jasper>
-    <commons.juli.version>7.0.32.v201211081135</commons.juli.version>
-    <commons.tomcat.api>7.0.32.v201211081135</commons.tomcat.api>
-    <commons.tomcat.util>7.0.32.v201211201952</commons.tomcat.util>
 
-    <commons.karaf.catalina>7.0.53.v201406061610</commons.karaf.catalina>
-    <commons.karaf.catalina.ha>7.0.53.v201406070630</commons.karaf.catalina.ha>
-    <commons.karaf.catalina.tribes>7.0.53.v201406070630</commons.karaf.catalina.tribes>
-    <commons.karaf.coyote>7.0.53.v201406070630</commons.karaf.coyote>
-    <commons.karaf.el>7.0.53.v201406060720</commons.karaf.el>
-    <commons.karaf.jasper>7.0.53.v201406070630</commons.karaf.jasper>
-    <commons.karaf.juli.version>7.0.53.v201406060720</commons.karaf.juli.version>
-    <commons.karaf.tomcat.api>7.0.53.v201406060720</commons.karaf.tomcat.api>
-    <commons.karaf.tomcat.util>7.0.53.v201406070630</commons.karaf.tomcat.util>
+    <commons.catalina>7.0.53.v201406061610</commons.catalina>
+    <commons.catalina.ha>7.0.53.v201406070630</commons.catalina.ha>
+    <commons.catalina.tribes>7.0.53.v201406070630</commons.catalina.tribes>
+    <commons.coyote>7.0.53.v201406070630</commons.coyote>
+    <commons.el>7.0.53.v201406060720</commons.el>
+    <commons.jasper>7.0.53.v201406070630</commons.jasper>
+    <commons.juli.version>7.0.53.v201406060720</commons.juli.version>
+    <commons.tomcat.api>7.0.53.v201406060720</commons.tomcat.api>
+    <commons.tomcat.util>7.0.53.v201406070630</commons.tomcat.util>
 
     <commons.checkstyle.version>0.0.3-SNAPSHOT</commons.checkstyle.version>
     <commons.fileupload.version>1.2.2</commons.fileupload.version>
@@ -77,6 +68,7 @@
     <concurrentlinkedhashmap.version>1.4</concurrentlinkedhashmap.version>
     <config.version>0.2.5-SNAPSHOT</config.version>
     <config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
+    <config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
     <config.netty.configfile>00-netty.xml</config.netty.configfile>
     <config.mdsal.configfile>01-mdsal.xml</config.mdsal.configfile>
     <config.xsql.configfile>04-xsql.xml</config.xsql.configfile>
     <topologymanager.shell.version>1.0.0-SNAPSHOT</topologymanager.shell.version>
     <troubleshoot.web.version>0.4.2-SNAPSHOT</troubleshoot.web.version>
     <typesafe.config.version>1.2.0</typesafe.config.version>
-    <uncommons.maths.version>1.2.2</uncommons.maths.version>
+    <uncommons.maths.version>1.2.2a</uncommons.maths.version>
     <usermanager.implementation.version>0.4.2-SNAPSHOT</usermanager.implementation.version>
     <usermanager.northbound.version>0.0.2-SNAPSHOT</usermanager.northbound.version>
     <usermanager.version>0.4.2-SNAPSHOT</usermanager.version>
index c2ac77a..a644bf6 100644 (file)
@@ -92,6 +92,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index 4ae35c9..fcb452f 100644 (file)
       <groupId>org.opendaylight.controller.thirdparty</groupId>
       <artifactId>net.sf.jung2</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller.thirdparty</groupId>
-      <artifactId>org.apache.catalina.filters.CorsFilter</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller.thirdparty</groupId>
       <artifactId>org.openflow.openflowj</artifactId>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-restconf-broker</artifactId>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-remoterpc-connector</artifactId>
-        </dependency>
 
 
         <dependency>
           <artifactId>jeromq</artifactId>
           <version>0.3.1</version>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller</groupId>
-          <artifactId>sal-distributed-datastore</artifactId>
-        </dependency>
         <dependency>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-clustering-config</artifactId>
                 <phase>generate-resources</phase>
                 <configuration>
                    <outputDirectory>${project.build.directory}/configuration</outputDirectory>
-                   <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
-                   <includes>**\/*.xml</includes>
+                    <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config,sal-clustering-config</includeArtifactIds>
+                    <includes>**\/*.xml,**/*.conf</includes>
                    <excludeTransitive>true</excludeTransitive>
                    <ignorePermissions>false</ignorePermissions>
                 </configuration>
index b2fc3cb..530e46e 100644 (file)
@@ -116,6 +116,11 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
 # TLS configuration
 # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
 # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
index 325005b..98c81c2 100644 (file)
@@ -97,9 +97,8 @@
         <configuration>
           <instructions>
             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
-            <Export-package></Export-package>
-            <Private-Package></Private-Package>
-            <Import-Package></Import-Package>
+            <Export-package>org.opendaylight.cluster.raft</Export-package>
+            <Import-Package>*</Import-Package>
           </instructions>
         </configuration>
       </plugin>
index cbd7ca2..c4ff108 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected Object createSnapshot() {
-        return state;
+    @Override protected void createSnapshot() {
+        ByteString bs = null;
+        try {
+            bs = fromObject(state);
+        } catch (Exception e) {
+            LOG.error("Exception in creating snapshot", e);
+        }
+        getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
-        state.putAll((HashMap) snapshot);
-        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+        try {
+            state.putAll((HashMap) toObject(snapshot));
+        } catch (Exception e) {
+           LOG.error("Exception in applying snapshot", e);
+        }
+        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+    }
+
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
+
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs.toByteArray());
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
     }
 
     @Override protected void onStateChanged() {
index d11377d..6192cad 100644 (file)
@@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     public long getSnapshotBatchCount() {
         return 50;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return 50;
+    }
 }
index fd6e192..978ea91 100644 (file)
@@ -109,6 +109,8 @@ public class TestDriver {
                 td.printState();
             } else if (command.startsWith("printNodes")) {
                 td.printNodes();
+            } else {
+                System.out.println("Invalid command:" + command);
             }
 
         }
index b5b034a..b436bce 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -16,12 +18,18 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    protected List<ReplicatedLogEntry> journal;
+    protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ByteString previousSnapshot;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
@@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
-    public Object getSnapshot() {
+    public ByteString getSnapshot() {
         return snapshot;
     }
 
@@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+
+        previousSnapshot = getSnapshot();
+        setSnapshot(snapshot);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal.clear();
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        previousSnapshot = null;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal.clear();
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+
+        snapshot = previousSnapshot;
+        previousSnapshot = null;
+
+    }
 }
index 4c6434a..ed6439d 100644 (file)
@@ -52,4 +52,9 @@ public interface ConfigParams {
      * @return int
      */
     public int getElectionTimeVariance();
+
+    /**
+     * The size (in bytes) of the snapshot chunk sent from Leader
+     */
+    public int getSnapshotChunkSize();
 }
index 6432fa4..75c237f 100644 (file)
@@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return SNAPSHOT_CHUNK_SIZE;
+    }
 }
index 27e8f34..8135d83 100644 (file)
@@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
+    private CaptureSnapshot captureSnapshot = null;
+
+    private volatile boolean hasSnapshotCaptureInitiated = false;
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
@@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
             context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
             LOG.debug("Applied snapshot to replicatedLog. " +
                 "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.size());
 
             // Apply the snapshot to the actors state
-            applySnapshot(snapshot.getState());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
@@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 applyState.getReplicatedLogEntry().getData());
 
         } else if(message instanceof ApplySnapshot ) {
-            applySnapshot(((ApplySnapshot) message).getSnapshot());
+            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+            LOG.debug("ApplySnapshot called on Follower Actor " +
+                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            //clears the followers log, sets the snapshot index to ensure adjusted-index works
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+            context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+            LOG.info("SaveSnapshotSuccess received for snapshot");
+
+            context.getReplicatedLog().snapshotCommit();
 
             // TODO: Not sure if we want to be this aggressive with trimming stuff
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
 
-            // TODO: Handle failure in saving the snapshot
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+                context.getReplicatedLog().getSnapshotIndex(),
+                context.getReplicatedLog().getSnapshotTerm(),
+                context.getReplicatedLog().size());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
 
+        } else if (message instanceof CaptureSnapshot) {
+            LOG.debug("CaptureSnapshot received by actor");
+            CaptureSnapshot cs = (CaptureSnapshot)message;
+            captureSnapshot = cs;
+            createSnapshot();
+
+        } else if (message instanceof CaptureSnapshotReply){
+            LOG.debug("CaptureSnapshotReply received by actor");
+            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+            ByteString stateInBytes = csr.getSnapshot();
+            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            handleCaptureSnapshotReply(stateInBytes);
+
         } else {
+            if (!(message instanceof AppendEntriesMessages.AppendEntries)
+                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+                LOG.debug("onReceiveCommand: message:" + message.getClass());
+            }
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -348,7 +398,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return The current state of the actor
      */
-    protected abstract Object createSnapshot();
+    protected abstract void createSnapshot();
 
     /**
      * This method will be called by the RaftActor during recovery to
@@ -360,7 +410,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(Object snapshot);
+    protected abstract void applySnapshot(ByteString snapshot);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -427,11 +477,39 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return peerAddress;
     }
 
+    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+        // create a snapshot object from the state provided and save it
+        // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+        saveSnapshot(sn);
+
+        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+        //be greedy and remove entries from in-mem journal which are in the snapshot
+        // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+            captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        captureSnapshot = null;
+        hasSnapshotCaptureInitiated = false;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getState(),
+            super(ByteString.copyFrom(snapshot.getState()),
                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
@@ -480,8 +558,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                        // when a snaphsot is being taken, captureSnapshot != null
+                        if (hasSnapshotCaptureInitiated == false &&
+                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -497,26 +577,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
                             LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
                             LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
 
-                            // create a snapshot object from the state provided and save it
-                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
-                            Snapshot sn = Snapshot.create(createSnapshot(),
-                                getFrom(context.getLastApplied() + 1),
-                                lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm);
-                            saveSnapshot(sn);
-
-                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
-                            //be greedy and remove entries from in-mem journal which are in the snapshot
-                            // and update snapshotIndex and snapshotTerm without waiting for the success,
-                            // TODO: damage-recovery to be done on failure
-                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
-                            snapshotIndex = lastAppliedIndex;
-                            snapshotTerm = lastAppliedTerm;
-
-                            LOG.info("Removed in-memory snapshotted entries, " +
-                                "adjusted snaphsotIndex:{}" +
-                                "and term:{}", snapshotIndex, lastAppliedTerm);
+                            // send a CaptureSnapshot to self to make the expensive operation async.
+                            getSelf().tell(new CaptureSnapshot(
+                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                                null);
+                            hasSnapshotCaptureInitiated = true;
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -546,65 +611,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class Snapshot implements Serializable {
-        private final Object state;
-        private final List<ReplicatedLogEntry> unAppliedEntries;
-        private final long lastIndex;
-        private final long lastTerm;
-        private final long lastAppliedIndex;
-        private final long lastAppliedTerm;
-
-        private Snapshot(Object state,
-            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
-            this.state = state;
-            this.unAppliedEntries = unAppliedEntries;
-            this.lastIndex = lastIndex;
-            this.lastTerm = lastTerm;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-
-
-        public static Snapshot create(Object state,
-            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm) {
-            return new Snapshot(state, entries, lastIndex, lastTerm,
-                lastAppliedIndex, lastAppliedTerm);
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public List<ReplicatedLogEntry> getUnAppliedEntries() {
-            return unAppliedEntries;
-        }
-
-        public long getLastTerm() {
-            return lastTerm;
-        }
-
-        public long getLastAppliedIndex() {
-            return lastAppliedIndex;
-        }
-
-        public long getLastAppliedTerm() {
-            return lastAppliedTerm;
-        }
-
-        public String getLogMessage() {
-            StringBuilder sb = new StringBuilder();
-            return sb.append("Snapshot={")
-                .append("lastTerm:" + this.getLastTerm()  + ", ")
-                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-                .toString();
-
-        }
-    }
-
     private class ElectionTermImpl implements ElectionTerm {
         /**
          * Identifier of the actor whose election term information this is
index e6e160b..c17f544 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.List;
 
 /**
@@ -118,7 +120,7 @@ public interface ReplicatedLog {
      *
      * @return an object representing the snapshot if it exists. null otherwise
      */
-    Object getSnapshot();
+    ByteString getSnapshot();
 
     /**
      * Get the index of the snapshot
@@ -134,4 +136,49 @@ public interface ReplicatedLog {
      * otherwise
      */
     long getSnapshotTerm();
+
+    /**
+     * sets the snapshot index in the replicated log
+     * @param snapshotIndex
+     */
+    void setSnapshotIndex(long snapshotIndex);
+
+    /**
+     * sets snapshot term
+     * @param snapshotTerm
+     */
+    public void setSnapshotTerm(long snapshotTerm);
+
+    /**
+     * sets the snapshot in bytes
+     * @param snapshot
+     */
+    public void setSnapshot(ByteString snapshot);
+
+    /**
+     * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+     * @param startIndex
+     * @param endIndex
+     */
+    public void clear(int startIndex, int endIndex);
+
+    /**
+     * Handles all the bookkeeping in order to perform a rollback in the
+     * event of SaveSnapshotFailure
+     * @param snapshot
+     * @param snapshotCapturedIndex
+     * @param snapshotCapturedTerm
+     */
+    public void snapshotPreCommit(ByteString snapshot,
+        long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+    /**
+     * Sets the Replicated log to state after snapshot success.
+     */
+    public void snapshotCommit();
+
+    /**
+     * Restores the replicated log to a state in the event of a save snapshot failure
+     */
+    public void snapshotRollback();
 }
index 374e0fa..2f5ba48 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 
 public class SerializationUtils {
 
     public static Object fromSerializable(Object serializable){
         if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
             return AppendEntries.fromSerializable(serializable);
+
+        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+            return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
new file mode 100644 (file)
index 0000000..8e0fcca
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+    private final byte[] state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+
+    private Snapshot(byte[] state,
+        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+
+    public static Snapshot create(byte[] state,
+        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm,
+            lastAppliedIndex, lastAppliedTerm);
+    }
+
+    public byte[] getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public String getLogMessage() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("Snapshot={")
+            .append("lastTerm:" + this.getLastTerm() + ", ")
+            .append("lastIndex:" + this.getLastIndex()  + ", ")
+            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+            .toString();
+
+    }
+}
index 9739fb2..c356804 100644 (file)
@@ -8,16 +8,21 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
 import java.io.Serializable;
 
+/**
+ * Internal message, issued by follower to its actor
+ */
 public class ApplySnapshot implements Serializable {
-    private final Object snapshot;
+    private final Snapshot snapshot;
 
-    public ApplySnapshot(Object snapshot) {
+    public ApplySnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public Object getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
new file mode 100644 (file)
index 0000000..bb86e1a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+    private long lastAppliedIndex;
+    private long lastAppliedTerm;
+    private long lastIndex;
+    private long lastTerm;
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return lastIndex;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
new file mode 100644 (file)
index 0000000..96150db
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+    private ByteString snapshot;
+
+    public CaptureSnapshotReply(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+}
index 251a13d..7e896fe 100644 (file)
@@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index a log index that is known to be committed
      */
     protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
@@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (replicatedLogEntry != null) {
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
                 context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().debug("Setting last applied to {}", index);
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 54e0494..610fdc9 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (ยง5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }
index 234f9db..90948ff 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    private final Map<String, FollowerLogInformation> followerToLog =
+    protected final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
@@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
-        InstallSnapshotReply reply = message;
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerToSnapshot followerToSnapshot =
+            mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    context.getLogger().debug("InstallSnapshotReply received, " +
+                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                        reply.getChunkIndex(), followerId,
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+
+                    FollowerLogInformation followerLogInformation =
+                        followerToLog.get(followerId);
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                        followerToLog.get(followerId).getNextIndex().get());
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                context.getLogger().info("InstallSnapshotReply received, " +
+                    "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex());
+                followerToSnapshot.markSendStatus(false);
+            }
 
-        followerLogInformation
-            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
-        followerLogInformation
-            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+        } else {
+            context.getLogger().error("ERROR!!" +
+                "FollowerId in InstallSnapshotReply not known to Leader" +
+                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+        }
     }
 
     private void replicate(Replicate replicate) {
@@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior {
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
         for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-                if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // FIXME : Sending one entry at a time
-                    entries =
-                        context.getReplicatedLog().getFrom(nextIndex, 1);
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    }
+
+                } else {
+
+                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex(followerNextIndex),
+                                prevLogTerm(followerNextIndex), entries,
+                                context.getCommitIndex()).toSerializable(),
+                            actor()
+                        );
+
+                    } else {
+                        // if the followers next index is not present in the leaders log, then snapshot should be sent
+                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+                            // if the follower is just not starting and leader's index
+                            // is more than followers index
+                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+                            actor().tell(new SendInstallSnapshot(), actor());
+                        } else {
+                            followerActor.tell(
+                                new AppendEntries(currentTerm(), context.getId(),
+                                    prevLogIndex(followerNextIndex),
+                                    prevLogTerm(followerNextIndex), entries,
+                                    context.getCommitIndex()).toSerializable(),
+                                actor()
+                            );
+                        }
+                    }
                 }
-
-                followerActor.tell(
-                    new AppendEntries(currentTerm(), context.getId(),
-                        prevLogIndex(nextIndex),
-                        prevLogTerm(nextIndex), entries,
-                        context.getCommitIndex()).toSerializable(),
-                    actor()
-                );
             }
         }
     }
@@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
-                if (!context.getReplicatedLog().isPresent(nextIndex) && context
-                    .getReplicatedLog().isInSnapshot(nextIndex)) {
-                    followerActor.tell(
-                        new InstallSnapshot(currentTerm(), context.getId(),
-                            context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().getSnapshotTerm(),
-                            context.getReplicatedLog().getSnapshot()
-                        ),
-                        actor()
-                    );
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
     }
 
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    getNextSnapshotChunk(followerId,
+                        context.getReplicatedLog().getSnapshot()),
+                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                ).toSerializable(),
+                actor()
+            );
+            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+        } catch (IOException e) {
+            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+        return nextChunk;
+    }
+
     private RaftState sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
@@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior {
         return context.getId();
     }
 
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+                size, totalChunks);
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            context.getLogger().debug("length={}, offset={},size={}",
+                snapshotLength, start, size);
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
 }
index 888854f..9d40fa3 100644 (file)
@@ -8,19 +8,29 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
 public class InstallSnapshot extends AbstractRaftRPC {
 
+    public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
     private final String leaderId;
     private final long lastIncludedIndex;
     private final long lastIncludedTerm;
-    private final Object data;
+    private final ByteString data;
+    private final int chunkIndex;
+    private final int totalChunks;
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
         this.lastIncludedTerm = lastIncludedTerm;
         this.data = data;
+        this.chunkIndex = chunkIndex;
+        this.totalChunks = totalChunks;
     }
 
     public String getLeaderId() {
@@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public Object getData() {
+    public ByteString getData() {
         return data;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public <T extends Object> Object toSerializable(){
+        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+            .setLeaderId(this.getLeaderId())
+            .setChunkIndex(this.getChunkIndex())
+            .setData(this.getData())
+            .setLastIncludedIndex(this.getLastIncludedIndex())
+            .setLastIncludedTerm(this.getLastIncludedTerm())
+            .setTotalChunks(this.getTotalChunks()).build();
+
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        InstallSnapshotMessages.InstallSnapshot from =
+            (InstallSnapshotMessages.InstallSnapshot) o;
+
+        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+            from.getLeaderId(), from.getLastIncludedIndex(),
+            from.getLastIncludedTerm(), from.getData(),
+            from.getChunkIndex(), from.getTotalChunks());
+
+        return installSnapshot;
+    }
 }
index 85b89b7..d293a47 100644 (file)
@@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
+    private final int chunkIndex;
+    private boolean success;
 
-    protected InstallSnapshotReply(long term, String followerId) {
+    public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+        boolean success) {
         super(term);
         this.followerId = followerId;
+        this.chunkIndex = chunkIndex;
+        this.success = success;
     }
 
     public String getFollowerId() {
         return followerId;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java
new file mode 100644 (file)
index 0000000..e801ae1
--- /dev/null
@@ -0,0 +1,1015 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+  private InstallSnapshotMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface InstallSnapshotOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 lastIncludedIndex = 3;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    boolean hasLastIncludedIndex();
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    long getLastIncludedIndex();
+
+    // optional int64 lastIncludedTerm = 4;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    boolean hasLastIncludedTerm();
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    long getLastIncludedTerm();
+
+    // optional bytes data = 5;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    boolean hasData();
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    com.google.protobuf.ByteString getData();
+
+    // optional int32 chunkIndex = 6;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    boolean hasChunkIndex();
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    int getChunkIndex();
+
+    // optional int32 totalChunks = 7;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    boolean hasTotalChunks();
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    int getTotalChunks();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+   */
+  public static final class InstallSnapshot extends
+      com.google.protobuf.GeneratedMessage
+      implements InstallSnapshotOrBuilder {
+    // Use InstallSnapshot.newBuilder() to construct.
+    private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final InstallSnapshot defaultInstance;
+    public static InstallSnapshot getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public InstallSnapshot getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private InstallSnapshot(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              lastIncludedIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              lastIncludedTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              data_ = input.readBytes();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              chunkIndex_ = input.readInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              totalChunks_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+        new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+      public InstallSnapshot parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new InstallSnapshot(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 term = 1;
+    public static final int TERM_FIELD_NUMBER = 1;
+    private long term_;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public boolean hasTerm() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public long getTerm() {
+      return term_;
+    }
+
+    // optional string leaderId = 2;
+    public static final int LEADERID_FIELD_NUMBER = 2;
+    private java.lang.Object leaderId_;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public boolean hasLeaderId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public java.lang.String getLeaderId() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          leaderId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLeaderIdBytes() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        leaderId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional int64 lastIncludedIndex = 3;
+    public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+    private long lastIncludedIndex_;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public boolean hasLastIncludedIndex() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public long getLastIncludedIndex() {
+      return lastIncludedIndex_;
+    }
+
+    // optional int64 lastIncludedTerm = 4;
+    public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+    private long lastIncludedTerm_;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public boolean hasLastIncludedTerm() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public long getLastIncludedTerm() {
+      return lastIncludedTerm_;
+    }
+
+    // optional bytes data = 5;
+    public static final int DATA_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString data_;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public boolean hasData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public com.google.protobuf.ByteString getData() {
+      return data_;
+    }
+
+    // optional int32 chunkIndex = 6;
+    public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+    private int chunkIndex_;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public boolean hasChunkIndex() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public int getChunkIndex() {
+      return chunkIndex_;
+    }
+
+    // optional int32 totalChunks = 7;
+    public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+    private int totalChunks_;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public boolean hasTotalChunks() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public int getTotalChunks() {
+      return totalChunks_;
+    }
+
+    private void initFields() {
+      term_ = 0L;
+      leaderId_ = "";
+      lastIncludedIndex_ = 0L;
+      lastIncludedTerm_ = 0L;
+      data_ = com.google.protobuf.ByteString.EMPTY;
+      chunkIndex_ = 0;
+      totalChunks_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt32(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, totalChunks_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, totalChunks_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+      }
+
+      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        term_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        leaderId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lastIncludedIndex_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedTerm_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        data_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        chunkIndex_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        totalChunks_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.term_ = term_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.leaderId_ = leaderId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.lastIncludedIndex_ = lastIncludedIndex_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastIncludedTerm_ = lastIncludedTerm_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.data_ = data_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.chunkIndex_ = chunkIndex_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.totalChunks_ = totalChunks_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+        if (other.hasTerm()) {
+          setTerm(other.getTerm());
+        }
+        if (other.hasLeaderId()) {
+          bitField0_ |= 0x00000002;
+          leaderId_ = other.leaderId_;
+          onChanged();
+        }
+        if (other.hasLastIncludedIndex()) {
+          setLastIncludedIndex(other.getLastIncludedIndex());
+        }
+        if (other.hasLastIncludedTerm()) {
+          setLastIncludedTerm(other.getLastIncludedTerm());
+        }
+        if (other.hasData()) {
+          setData(other.getData());
+        }
+        if (other.hasChunkIndex()) {
+          setChunkIndex(other.getChunkIndex());
+        }
+        if (other.hasTotalChunks()) {
+          setTotalChunks(other.getTotalChunks());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 term = 1;
+      private long term_ ;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public boolean hasTerm() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public long getTerm() {
+        return term_;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder setTerm(long value) {
+        bitField0_ |= 0x00000001;
+        term_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder clearTerm() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        term_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional string leaderId = 2;
+      private java.lang.Object leaderId_ = "";
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public boolean hasLeaderId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public java.lang.String getLeaderId() {
+        java.lang.Object ref = leaderId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          leaderId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLeaderIdBytes() {
+        java.lang.Object ref = leaderId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          leaderId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder clearLeaderId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        leaderId_ = getDefaultInstance().getLeaderId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedIndex = 3;
+      private long lastIncludedIndex_ ;
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public boolean hasLastIncludedIndex() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public long getLastIncludedIndex() {
+        return lastIncludedIndex_;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder setLastIncludedIndex(long value) {
+        bitField0_ |= 0x00000004;
+        lastIncludedIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder clearLastIncludedIndex() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedIndex_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedTerm = 4;
+      private long lastIncludedTerm_ ;
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public boolean hasLastIncludedTerm() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public long getLastIncludedTerm() {
+        return lastIncludedTerm_;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder setLastIncludedTerm(long value) {
+        bitField0_ |= 0x00000008;
+        lastIncludedTerm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder clearLastIncludedTerm() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        lastIncludedTerm_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes data = 5;
+      private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public boolean hasData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public com.google.protobuf.ByteString getData() {
+        return data_;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder setData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        data_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder clearData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        data_ = getDefaultInstance().getData();
+        onChanged();
+        return this;
+      }
+
+      // optional int32 chunkIndex = 6;
+      private int chunkIndex_ ;
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public boolean hasChunkIndex() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public int getChunkIndex() {
+        return chunkIndex_;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder setChunkIndex(int value) {
+        bitField0_ |= 0x00000020;
+        chunkIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder clearChunkIndex() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        chunkIndex_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 totalChunks = 7;
+      private int totalChunks_ ;
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public boolean hasTotalChunks() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public int getTotalChunks() {
+        return totalChunks_;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder setTotalChunks(int value) {
+        bitField0_ |= 0x00000040;
+        totalChunks_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder clearTotalChunks() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        totalChunks_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+    }
+
+    static {
+      defaultInstance = new InstallSnapshot(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+      "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+      "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+      "light.controller.cluster.raft.protobuff." +
+      "messagesB\027InstallSnapshotMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
new file mode 100644 (file)
index 0000000..14f821b
--- /dev/null
@@ -0,0 +1,15 @@
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+    optional int64 term = 1;
+    optional string leaderId = 2;
+    optional int64 lastIncludedIndex = 3;
+    optional int64 lastIncludedTerm = 4;
+    optional bytes data = 5;
+    optional int32 chunkIndex = 6;
+    optional int32 totalChunks = 7;
+}
index ea3c9e7..ca34a34 100644 (file)
@@ -14,17 +14,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
@@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap();
+    private ConfigParams configParams;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext {
             }
         };
 
+        configParams = new DefaultConfigParamsImpl();
+
         initReplicatedLog();
     }
 
@@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public ConfigParams getConfigParams() {
-        return new DefaultConfigParamsImpl();
+        return configParams;
     }
 
-    public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
-        @Override public ReplicatedLogEntry get(long index) {
-            if(index >= log.size() || index < 0){
-                return null;
-            }
-            return log.get((int) index);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if(log.size() == 0){
-                return null;
-            }
-            return log.get(log.size()-1);
-        }
-
-        @Override public long lastIndex() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return;
-            }
-
-            log.subList((int) index, log.size()).clear();
-            //log.remove((int) index);
-        }
-
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            log.add(replicatedLogEntry);
-        }
+    public void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
 
+    public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
 
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            for(int i=(int) index ; i < log.size() ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            int maxIndex = (int) index + max;
-            if(maxIndex > log.size()){
-                maxIndex = log.size();
-            }
-
-            for(int i=(int) index ; i < maxIndex ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-
-        }
-
-        @Override public long size() {
-            return log.size();
-        }
-
-        @Override public boolean isPresent(long index) {
-            if(index >= log.size() || index < 0){
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return false;
-        }
-
-        @Override public Object getSnapshot() {
-            return null;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return -1;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return -1;
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
     }
 
index ff0ffeb..12123db 100644 (file)
@@ -6,6 +6,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest {
             Object data) {
         }
 
-        @Override protected Object createSnapshot() {
+        @Override protected void createSnapshot() {
             throw new UnsupportedOperationException("createSnapshot");
         }
 
-        @Override protected void applySnapshot(Object snapshot) {
+        @Override protected void applySnapshot(ByteString snapshot) {
             throw new UnsupportedOperationException("applySnapshot");
         }
 
index c5a81aa..227d1ef 100644 (file)
@@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
                         new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
index 17c22a1..73c9f96 100644 (file)
@@ -1,24 +1,40 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
-
-
             };
         }};
     }
@@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
+            };
+        }};
+    }
+
+    @Test
+    public void testSendInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext(getRef());
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+                    RaftState raftState = leader.handleMessage(
+                        senderActor, new Replicate(null, "state-id", entry));
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                        @Override
+                        protected Boolean match(Object o) throws Exception {
+                            if (o instanceof SendInstallSnapshot) {
+                                return true;
+                            }
+                            return false;
+                        }
+                    }.get();
+
+                    boolean sendInstallSnapshotReceived = false;
+                    for (Boolean b: matches) {
+                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+                    }
+
+                    assertTrue(sendInstallSnapshotReceived);
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+
+                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // check if installsnapshot gets called with the correct values.
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                                    InstallSnapshot is = (InstallSnapshot)
+                                        SerializationUtils.fromSerializable(in);
+                                    if (is.getData() == null) {
+                                        return "InstallSnapshot data is null";
+                                    }
+                                    if (is.getLastIncludedIndex() != snapshotIndex) {
+                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                                    }
+                                    if (is.getLastIncludedTerm() != snapshotTerm) {
+                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                                    }
+                                    if (is.getTerm() == currentTerm) {
+                                        return is.getTerm() + "!=" + currentTerm;
+                                    }
+
+                                    return "match";
+
+                               } else {
+                                    return "message mismatch:" + in.getClass();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new LeaderTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    ByteString bs = toByteString(leadersSnapshot);
+                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                        leader.getFollowerToSnapshot().getNextChunk();
+                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                    }
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
 
+                    RaftState raftState = leader.handleMessage(senderActor,
+                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                            leader.getFollowerToSnapshot().getChunkIndex(), true));
 
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+                    assertEquals(leader.followerToLog.size(), 1);
+                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+                }
             };
         }};
     }
 
+    @Test
+    public void testFollowerToSnapshotLogic() {
+
+        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
+
+        MockLeader leader = new MockLeader(actorContext);
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        ByteString bs = toByteString(leadersSnapshot);
+        byte[] barray = bs.toByteArray();
+
+        leader.createFollowerToSnapshot("followerId", bs);
+        assertEquals(bs.size(), barray.length);
+
+        int chunkIndex=0;
+        for (int i=0; i < barray.length; i = i + 50) {
+            int j = i + 50;
+            chunkIndex++;
+
+            if (i + 50 > barray.length) {
+                j = barray.length;
+            }
+
+            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+            leader.getFollowerToSnapshot().markSendStatus(true);
+            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+        }
+
+        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+    }
+
+
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), leaderActor);
+        return createActorContext(leaderActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef actorRef) {
+        return new MockRaftActorContext("test", getSystem(), actorRef);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+    private static class LeaderTestKit extends JavaTestKit {
+
+        private LeaderTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+    }
+
+    class MockLeader extends Leader {
+
+        FollowerToSnapshot fts;
+
+        public MockLeader(RaftActorContext context){
+            super(context);
+        }
+
+        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(nextIndex),
+                    new AtomicLong(matchIndex));
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        public FollowerToSnapshot getFollowerToSnapshot() {
+            return fts;
+        }
+
+        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+            fts = new FollowerToSnapshot(bs);
+            mapFollowerToSnapshot.put(followerId, fts);
+
+        }
     }
 }
index 2d8e51c..d16170b 100644 (file)
@@ -7,11 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import javax.annotation.concurrent.GuardedBy;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -19,6 +18,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -26,20 +26,19 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.yangtools.concepts.Delegator;
 
-import com.google.common.base.Preconditions;
-
 class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
 
     private final DOMTransactionChain delegate;
-
-    @GuardedBy("this")
-    private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
     private final BindingToNormalizedNodeCodec codec;
+    private final DelegateChainListener delegatingListener;
+    private final TransactionChainListener listener;
 
     public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
             final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
         Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
-        this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+        this.delegatingListener = new DelegateChainListener();
+        this.listener = listener;
+        this.delegate = chainFactory.createTransactionChain(listener);
         this.codec = codec;
     }
 
@@ -52,56 +51,79 @@ class BindingTranslatedTransactionChain implements BindingTransactionChain, Dele
     public ReadOnlyTransaction newReadOnlyTransaction() {
         DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
         ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
         return bindingTx;
     }
 
     @Override
     public ReadWriteTransaction newReadWriteTransaction() {
         DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
-        ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
+        ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec) {
+
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+                return listenForFailure(this,super.submit());
+            }
+
+        };
         return bindingTx;
     }
 
     @Override
     public WriteTransaction newWriteOnlyTransaction() {
-        DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
-        WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
-        putDelegateToBinding(delegateTx, bindingTx);
+        final DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+        WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(delegateTx, codec) {
+
+            @Override
+            public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+                return listenForFailure(this,super.submit());
+            };
+
+        };
         return bindingTx;
     }
 
-    @Override
-    public void close() {
-        delegate.close();
+    protected CheckedFuture<Void, TransactionCommitFailedException> listenForFailure(
+            final WriteTransaction tx, CheckedFuture<Void, TransactionCommitFailedException> future) {
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onFailure(Throwable t) {
+                failTransactionChain(tx,t);
+            }
+
+            @Override
+            public void onSuccess(Void result) {
+                // Intentionally NOOP
+            }
+        });
+
+        return future;
     }
 
-    private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
-            final AsyncTransaction<?, ?> bindingTx) {
-        final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
-        Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+    protected void failTransactionChain(WriteTransaction tx, Throwable t) {
+        // We asume correct state change for underlaying transaction
+        // chain, so we are not changing any of our internal state
+        // to mark that we failed.
+        this.delegatingListener.onTransactionChainFailed(this, tx, t);
     }
 
-    private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
-        return delegateTxToBindingTx.get(transaction);
+    @Override
+    public void close() {
+        delegate.close();
     }
 
-    private final class ListenerInvoker implements TransactionChainListener {
-
-        private final TransactionChainListener listener;
-
-        public ListenerInvoker(final TransactionChainListener listener) {
-            this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
-        }
+    private final class DelegateChainListener implements TransactionChainListener {
 
         @Override
         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-            Preconditions.checkState(delegate.equals(chain),
-                    "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
-            AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
-            listener.onTransactionChainFailed(chain, bindingTx, cause);
+            /*
+             * Intentionally NOOP, callback for failure, since we
+             * are also listening on each transaction for failure.
+             *
+             * by listening on submit future for Binding transaction
+             * in order to provide Binding transaction (which was seen by client
+             * of this transaction chain, instead of
+            */
         }
 
         @Override
index b8980cd..a3619ec 100644 (file)
       </dependency>
   </dependencies>
   <build>
+
       <plugins>
           <plugin>
               <groupId>org.jacoco</groupId>
                   </execution>
               </executions>
           </plugin>
-      </plugins>
+          <plugin>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <extensions>true</extensions>
+            <configuration>
+            <instructions>
+                <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                <Export-Package>org.opendaylight.controller.cluster.*,org.opendaylight.common.actor,org.opendaylight.common.reporting,org.opendaylight.controller.protobuff.*,org.opendaylight.controller.xml.*</Export-Package>
+                <Import-Package>*</Import-Package>
+            </instructions>
+            </configuration>
+          </plugin>
+    </plugins>
   </build>
-
 </project>
index fcf1f45..c867fce 100644 (file)
@@ -926,6 +926,16 @@ public final class PersistentMessages {
      */
     org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.ModificationOrBuilder getModificationOrBuilder(
         int index);
+
+    // optional int64 timeStamp = 2;
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    boolean hasTimeStamp();
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    long getTimeStamp();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CompositeModification}
@@ -986,6 +996,11 @@ public final class PersistentMessages {
               modification_.add(input.readMessage(org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.Modification.PARSER, extensionRegistry));
               break;
             }
+            case 16: {
+              bitField0_ |= 0x00000001;
+              timeStamp_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1028,6 +1043,7 @@ public final class PersistentMessages {
       return PARSER;
     }
 
+    private int bitField0_;
     // repeated .org.opendaylight.controller.mdsal.Modification modification = 1;
     public static final int MODIFICATION_FIELD_NUMBER = 1;
     private java.util.List<org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.Modification> modification_;
@@ -1064,8 +1080,25 @@ public final class PersistentMessages {
       return modification_.get(index);
     }
 
+    // optional int64 timeStamp = 2;
+    public static final int TIMESTAMP_FIELD_NUMBER = 2;
+    private long timeStamp_;
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    public boolean hasTimeStamp() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 timeStamp = 2;</code>
+     */
+    public long getTimeStamp() {
+      return timeStamp_;
+    }
+
     private void initFields() {
       modification_ = java.util.Collections.emptyList();
+      timeStamp_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1088,6 +1121,9 @@ public final class PersistentMessages {
       for (int i = 0; i < modification_.size(); i++) {
         output.writeMessage(1, modification_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(2, timeStamp_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1101,6 +1137,10 @@ public final class PersistentMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, modification_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, timeStamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1224,6 +1264,8 @@ public final class PersistentMessages {
         } else {
           modificationBuilder_.clear();
         }
+        timeStamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
 
@@ -1251,6 +1293,7 @@ public final class PersistentMessages {
       public org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification buildPartial() {
         org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification result = new org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages.CompositeModification(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (modificationBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             modification_ = java.util.Collections.unmodifiableList(modification_);
@@ -1260,6 +1303,11 @@ public final class PersistentMessages {
         } else {
           result.modification_ = modificationBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.timeStamp_ = timeStamp_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -1301,6 +1349,9 @@ public final class PersistentMessages {
             }
           }
         }
+        if (other.hasTimeStamp()) {
+          setTimeStamp(other.getTimeStamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1574,6 +1625,39 @@ public final class PersistentMessages {
         return modificationBuilder_;
       }
 
+      // optional int64 timeStamp = 2;
+      private long timeStamp_ ;
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public boolean hasTimeStamp() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public long getTimeStamp() {
+        return timeStamp_;
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public Builder setTimeStamp(long value) {
+        bitField0_ |= 0x00000002;
+        timeStamp_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 timeStamp = 2;</code>
+       */
+      public Builder clearTimeStamp() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        timeStamp_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CompositeModification)
     }
 
@@ -1610,11 +1694,12 @@ public final class PersistentMessages {
       "e\030\001 \002(\t\022C\n\004path\030\002 \002(\01325.org.opendaylight" +
       ".controller.mdsal.InstanceIdentifier\0225\n\004" +
       "data\030\003 \001(\0132\'.org.opendaylight.controller" +
-      ".mdsal.Node\"^\n\025CompositeModification\022E\n\014" +
+      ".mdsal.Node\"q\n\025CompositeModification\022E\n\014" +
       "modification\030\001 \003(\0132/.org.opendaylight.co" +
-      "ntroller.mdsal.ModificationBO\n9org.opend" +
-      "aylight.controller.protobuff.messages.pe",
-      "rsistentB\022PersistentMessages"
+      "ntroller.mdsal.Modification\022\021\n\ttimeStamp" +
+      "\030\002 \001(\003BO\n9org.opendaylight.controller.pr",
+      "otobuff.messages.persistentB\022PersistentM" +
+      "essages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1632,7 +1717,7 @@ public final class PersistentMessages {
           internal_static_org_opendaylight_controller_mdsal_CompositeModification_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CompositeModification_descriptor,
-              new java.lang.String[] { "Modification", });
+              new java.lang.String[] { "Modification", "TimeStamp", });
           return null;
         }
       };
index 5848561..ea8f4a3 100644 (file)
@@ -75,9 +75,13 @@ public class XmlUtils {
    */
   public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting input composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -108,9 +112,13 @@ public class XmlUtils {
    */
   public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting output composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -150,7 +158,9 @@ public class XmlUtils {
   }
 
   public static CompositeNode xmlToCompositeNode(String xml){
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
     Node<?> dataTree;
     try {
@@ -179,11 +189,17 @@ public class XmlUtils {
    */
   public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml,  SchemaContext schemaContext){
     LOG.debug("Converting input xml to composite node {}", xml);
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
-    if(rpc == null) return null;
+    if(rpc == null) {
+        return null;
+    }
 
-    if(schemaContext == null) return null;
+    if(schemaContext == null) {
+        return null;
+    }
 
     CompositeNode compositeNode = null;
     try {
@@ -213,7 +229,7 @@ public class XmlUtils {
           LOG.debug("Converted xml input to list of nodes  {}", dataNodes);
 
           final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-          it.setQName(input);
+          it.setQName(rpc);
           it.add(ImmutableCompositeNode.create(input, dataNodes));
           compositeNode = it.toInstance();
           break;
index 8e83449..72651ab 100644 (file)
@@ -11,10 +11,12 @@ message Modification {
     required string type=1;
     required InstanceIdentifier path=2;
     optional Node data=3;
+
 }
 
 
 message CompositeModification {
     repeated Modification modification=1;
+    optional int64 timeStamp = 2;
 }
 
index 31b658d..91c0b5c 100644 (file)
                   <type>xml</type>
                   <classifier>config</classifier>
                 </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/akka.conf</file>
+                  <type>xml</type>
+                  <classifier>akkaconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/module-shards.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleshardconf</classifier>
+                </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/modules.conf</file>
+                  <type>xml</type>
+                  <classifier>moduleconf</classifier>
+                </artifact>
               </artifacts>
             </configuration>
           </execution>
index 0532213..5a2116b 100644 (file)
@@ -21,7 +21,7 @@ odl-cluster-data {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2550
         maximum-frame-size = 419430400
         send-buffer-size = 52428800
@@ -30,9 +30,14 @@ odl-cluster-data {
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-data@<CHANGE_SEED_IP>:2550"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
       auto-down-unreachable-after = 10s
+
+      roles = [
+        "member-1"
+      ]
+
     }
   }
 }
@@ -51,13 +56,13 @@ odl-cluster-rpc {
     remote {
       log-remote-lifecycle-events = off
       netty.tcp {
-        hostname = "<CHANGE_ME>"
+        hostname = "127.0.0.1"
         port = 2551
       }
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@<CHANGE_SEED_IP>:2551"]
+      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
 
       auto-down-unreachable-after = 10s
     }
index dd5a7f2..8299822 100644 (file)
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-osgi_${scala.version}</artifactId>
+    </dependency>
+
     <!-- SAL Dependencies -->
 
     <dependency>
             <Export-package></Export-package>
             <Private-Package></Private-Package>
             <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
+            <!--
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
                 *scala*;
             </Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
+            -->
           </instructions>
         </configuration>
       </plugin>
index 15c0548..b326d61 100644 (file)
@@ -10,24 +10,55 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import com.google.common.base.Function;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.osgi.framework.BundleContext;
 
-import javax.annotation.Nullable;
+import java.io.File;
 
 public class ActorSystemFactory {
-    private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
-
-        @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
-                ActorSystem system =
-                    ActorSystem.create("opendaylight-cluster-data", ConfigFactory
-                        .load().getConfig("odl-cluster-data"));
-                system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-                return system;
-        }
-    }).apply(null);
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    public static final String CONFIGURATION_NAME = "odl-cluster-data";
+
+    private static volatile ActorSystem actorSystem = null;
 
     public static final ActorSystem getInstance(){
         return actorSystem;
     }
+
+    /**
+     * This method should be called only once during initialization
+     *
+     * @param bundleContext
+     */
+    public static final ActorSystem createInstance(final BundleContext bundleContext) {
+        if(actorSystem == null) {
+            // Create an OSGi bundle classloader for actor system
+            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+            synchronized (ActorSystemFactory.class) {
+                // Double check
+
+                if (actorSystem == null) {
+                    ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                        ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+                    system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+                    actorSystem = system;
+                }
+            }
+        }
+
+        return actorSystem;
+    }
+
+
+    private static final Config readAkkaConfiguration(){
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+    }
 }
index 202ced9..0a137e0 100644 (file)
@@ -72,6 +72,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                     actorSystem, actorSystem.actorOf(
                         ShardManager.props(type, cluster, configuration, datastoreContext).
                             withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
@@ -98,8 +100,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeLocalShardOperation(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-            ActorContext.ASK_DURATION);
+            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
 
         if (result != null) {
             RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
index 65a39a6..72b593f 100644 (file)
@@ -12,12 +12,13 @@ import akka.actor.ActorSystem;
 
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.osgi.framework.BundleContext;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DistributedDataStoreProperties dataStoreProperties) {
+            DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = ActorSystemFactory.getInstance();
+        ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
             new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
index eb6a536..df3245f 100644 (file)
@@ -18,21 +18,24 @@ public class DistributedDataStoreProperties {
     private final int maxShardDataChangeExecutorQueueSize;
     private final int maxShardDataChangeExecutorPoolSize;
     private final int shardTransactionIdleTimeoutInMinutes;
+    private final int operationTimeoutInSeconds;
 
     public DistributedDataStoreProperties() {
         maxShardDataChangeListenerQueueSize = 1000;
         maxShardDataChangeExecutorQueueSize = 1000;
         maxShardDataChangeExecutorPoolSize = 20;
         shardTransactionIdleTimeoutInMinutes = 10;
+        operationTimeoutInSeconds = 5;
     }
 
     public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
             int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes) {
+            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
         this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
         this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
         this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
         this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public int getMaxShardDataChangeListenerQueueSize() {
@@ -50,4 +53,8 @@ public class DistributedDataStoreProperties {
     public int getShardTransactionIdleTimeoutInMinutes() {
         return shardTransactionIdleTimeoutInMinutes;
     }
+
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index 43a9faa..6a6a181 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 
 import com.google.common.base.Optional;
@@ -21,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -144,8 +145,19 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
     }
 
+    @Override public void onReceiveRecover(Object message) {
+        LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+            getSender());
+
+        if (message instanceof RecoveryFailure){
+            LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+        } else {
+            super.onReceiveRecover(message);
+        }
+    }
+
     @Override public void onReceiveCommand(Object message) {
-        LOG.debug("Received message {} from {}", message.getClass().toString(),
+        LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
             getSender());
 
         if (message.getClass()
@@ -228,7 +240,8 @@ public class Shard extends RaftActor {
             .tell(new CreateTransactionReply(
                     Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId()).toSerializable(),
-                getSelf());
+                getSelf()
+            );
     }
 
     private void commit(final ActorRef sender, Object serialized) {
@@ -266,9 +279,9 @@ public class Shard extends RaftActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-               sender.tell(new CommitTransactionReply().toSerializable(),self);
-               shardMBean.incrementCommittedTransactionCount();
-               shardMBean.setLastCommittedTransactionTime(new Date());
+                sender.tell(new CommitTransactionReply().toSerializable(), self);
+                shardMBean.incrementCommittedTransactionCount();
+                shardMBean.setLastCommittedTransactionTime(new Date());
             }
 
             @Override
@@ -365,7 +378,6 @@ public class Shard extends RaftActor {
                     identifier, clientActor.path().toString());
             }
 
-
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -383,11 +395,11 @@ public class Shard extends RaftActor {
 
     }
 
-    @Override protected Object createSnapshot() {
+    @Override protected void createSnapshot() {
         throw new UnsupportedOperationException("createSnapshot");
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         throw new UnsupportedOperationException("applySnapshot");
     }
 
index c557118..a5be695 100644 (file)
@@ -151,8 +151,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
-                    ActorContext.ASK_DURATION));
+            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
index fc1a3aa..a8b20c0 100644 (file)
@@ -353,8 +353,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
-                ActorContext.ASK_DURATION);
+                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -472,7 +471,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Send the ReadyTransaction message to the Tx actor.
 
             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadyTransaction().toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -532,23 +531,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+                    new DeleteData(path).toSerializable() ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new MergeData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new WriteData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
@@ -634,7 +631,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadData(path).toSerializable());
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
@@ -715,7 +712,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
-                    new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new DataExists(path).toSerializable());
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
     }
index 1a005d8..04854d2 100644 (file)
@@ -52,6 +52,8 @@ public class MutableCompositeModification
         PersistentMessages.CompositeModification.Builder builder =
             PersistentMessages.CompositeModification.newBuilder();
 
+        builder.setTimeStamp(System.nanoTime());
+
         for (Modification m : modifications) {
             builder.addModification(
                 (PersistentMessages.Modification) m.toSerializable());
index 818a8ca..b87dc4f 100644 (file)
@@ -47,10 +47,7 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
+    private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
 
     public static final String MAILBOX = "bounded-mailbox";
 
@@ -59,6 +56,8 @@ public class ActorContext {
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private volatile SchemaContext schemaContext;
+    private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+    private Timeout operationTimeout = new Timeout(operationDuration);
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -93,6 +92,11 @@ public class ActorContext {
         }
     }
 
+    public void setOperationTimeout(int timeoutInSeconds) {
+        operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+    }
+
     public SchemaContext getSchemaContext() {
         return schemaContext;
     }
@@ -117,7 +121,7 @@ public class ActorContext {
      */
     public ActorRef findLocalShard(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindLocalShard(shardName), ASK_DURATION);
+            new FindLocalShard(shardName));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -133,7 +137,7 @@ public class ActorContext {
 
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
@@ -151,16 +155,13 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return The response of the operation
      */
-    public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration) {
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+    public Object executeLocalOperation(ActorRef actor, Object message) {
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
@@ -171,21 +172,19 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return
      */
-    public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration) {
+    public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() +
+                    " to actor " + actor.toString() + " failed" , e);
         }
     }
 
@@ -194,15 +193,13 @@ public class ActorContext {
      *
      * @param actor the ActorSelection
      * @param message the message to send
-     * @param duration the maximum amount of time to send he message
      * @return a Future containing the eventual result
      */
-    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
-            FiniteDuration duration) {
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        return ask(actor, message, new Timeout(duration));
+        return ask(actor, message, operationTimeout);
     }
 
     /**
@@ -225,16 +222,14 @@ public class ActorContext {
      *
      * @param shardName
      * @param message
-     * @param duration
      * @return
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeShardOperation(String shardName, Object message) {
         ActorSelection primary = findPrimary(shardName);
 
-        return executeRemoteOperation(primary, message, duration);
+        return executeRemoteOperation(primary, message);
     }
 
     /**
@@ -246,19 +241,17 @@ public class ActorContext {
      *
      * @param shardName the name of the shard on which the operation needs to be executed
      * @param message the message that needs to be sent to the shard
-     * @param duration the time duration in which this operation should complete
      * @return the message that was returned by the local actor on which the
      *         the operation was executed. If a local shard was not found then
      *         null is returned
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
      *         if the operation does not complete in a specified time duration
      */
-    public Object executeLocalShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeLocalShardOperation(String shardName, Object message) {
         ActorRef local = findLocalShard(shardName);
 
         if(local != null) {
-            return executeLocalOperation(local, message, duration);
+            return executeLocalOperation(local, message);
         }
 
         return null;
index f5a0d37..e2fbacb 100644 (file)
@@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.osgi.framework.BundleContext;
 
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
+    private BundleContext bundleContext;
+
     public DistributedConfigDataStoreProviderModule(
         org.opendaylight.controller.config.api.ModuleIdentifier identifier,
         org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@@ -33,9 +36,15 @@ public class DistributedConfigDataStoreProviderModule extends
         }
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
     }
 }
index 67bf599..0cdaca3 100644 (file)
@@ -8,6 +8,29 @@
 * Do not modify this file unless it is present under src/main directory
 */
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
 public class DistributedConfigDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModuleFactory {
 
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+        DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+        DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+        DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
+            old, bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+
 }
index 443334d..c185e87 100644 (file)
@@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.osgi.framework.BundleContext;
 
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
+    private BundleContext bundleContext;
+
     public DistributedOperationalDataStoreProviderModule(
         org.opendaylight.controller.config.api.ModuleIdentifier identifier,
         org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@@ -34,10 +37,16 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
     }
 
 }
index c9965fe..364fe62 100644 (file)
@@ -8,6 +8,27 @@
 * Do not modify this file unless it is present under src/main directory
 */
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
 public class DistributedOperationalDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModuleFactory {
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+        DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+        DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+        DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
+            old, bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
 
 }
index a9a8a1a..d50be2c 100644 (file)
@@ -36,30 +36,48 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
+    typedef non-zero-uint16-type {
+        type uint16 {
+            range "1..max";
+        }
+    }
+    
+    typedef operation-timeout-type {
+        type uint16 {
+            range "5..max";
+        }
+    }
+    
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
          
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
+         
+         leaf operation-timeout-in-seconds {
+            default 5;
+            type operation-timeout-type;
+            description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+         }
     }
     
     // Augments the 'configuration' choice node under modules/module.
index e653c3d..2ed11cf 100644 (file)
@@ -82,8 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index c99a7e8..3d0aaa0 100644 (file)
@@ -62,8 +62,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index e10570c..e39b9ab 100644 (file)
@@ -13,6 +13,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -185,17 +186,17 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
             ).build();
 
+        Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
         //This is done so that Modification list is updated which is used during commit
-        Future future =
-            akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+        Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
 
         //ready transaction creates the cohort so that we get into the
         //block where in commmit is done
         ShardTransactionMessages.ReadyTransaction readyTransaction =
             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
-        future =
-            akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+        future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
 
         //but when the message is sent it will have the MockCommit object
         //so that we can simulate throwing of exception
@@ -216,10 +217,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         when(mockModification.toSerializable()).thenReturn(
             PersistentMessages.CompositeModification.newBuilder().build());
 
-        future =
-            akka.pattern.Patterns.ask(subject,
-                mockForwardCommitTransaction
-                , 3000);
+        future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
         Await.result(future, ASK_RESULT_DURATION);
     }
 
index adb12b2..1cd0f85 100644 (file)
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 
 import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -93,12 +92,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(FiniteDuration.class));
+                isA(requestType));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+                any(ActorSelection.class), isA(requestType));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
index f69ae88..e5392e0 100644 (file)
@@ -56,8 +56,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -216,10 +214,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private FiniteDuration anyDuration() {
-        return any(FiniteDuration.class);
-    }
-
     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
@@ -232,7 +226,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 when(mockActorContext).actorSelection(actorRef.path().toString());
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
-                        eqCreateTransaction(memberName, type), anyDuration());
+                        eqCreateTransaction(memberName, type));
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
                 anyString(), eq(actorRef.path().toString()));
         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
@@ -259,7 +253,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -269,7 +263,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -283,7 +277,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -296,7 +290,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -308,7 +302,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             throws Throwable {
 
         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -348,14 +342,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -368,7 +361,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                    eq(actorSelection(actorRef)), eqReadData());
         }
     }
 
@@ -379,10 +372,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(expectedNode));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -414,14 +407,14 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -443,7 +436,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -456,7 +449,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -471,14 +464,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -491,7 +483,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                    eq(actorSelection(actorRef)), eqDataExists());
         }
     }
 
@@ -502,10 +494,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -556,7 +548,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -564,7 +556,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 WriteDataReply.SERIALIZABLE_CLASS);
@@ -599,7 +591,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -607,7 +599,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 MergeDataReply.SERIALIZABLE_CLASS);
@@ -618,7 +610,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -626,7 +618,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.delete(TestModel.TEST_PATH);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 DeleteDataReply.SERIALIZABLE_CLASS);
@@ -665,13 +657,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -700,14 +692,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -736,11 +727,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -763,7 +754,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
 
         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -793,11 +784,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -830,7 +821,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
index 7a21c8c..03aaace 100644 (file)
@@ -1,28 +1,42 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
 import com.google.common.base.Optional;
-import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+
 public class MutableCompositeModificationTest extends AbstractModificationTest {
 
-  @Test
-  public void testApply() throws Exception {
+    @Test
+    public void testApply() throws Exception {
+
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
+
+        DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction();
+        compositeModification.apply(transaction);
+        commitTransaction(transaction);
+
+        Optional<NormalizedNode<?, ?>> data = readData(TestModel.TEST_PATH);
 
-    MutableCompositeModification compositeModification = new MutableCompositeModification();
-    compositeModification.addModification(new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
+        assertNotNull(data.get());
+        assertEquals(TestModel.TEST_QNAME, data.get().getNodeType());
+    }
 
-    DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction();
-    compositeModification.apply(transaction);
-    commitTransaction(transaction);
+    @Test
+    public void testEverySerializedCompositeModificationObjectMustBeDifferent(){
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
 
-    Optional<NormalizedNode<?,?>> data = readData(TestModel.TEST_PATH);
+        assertNotEquals(compositeModification.toSerializable(), compositeModification.toSerializable());
 
-    Assert.assertNotNull(data.get());
-    Assert.assertEquals(TestModel.TEST_QNAME, data.get().getNodeType());
-  }
+    }
 }
index fda9ccd..5d8fb83 100644 (file)
@@ -117,7 +117,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertEquals("hello", out);
 
@@ -144,7 +144,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertNull(out);
 
@@ -232,7 +232,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+                    Object out = actorContext.executeRemoteOperation(actor, "hello");
 
                     assertEquals("hello", out);
 
@@ -261,8 +261,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
-                            Duration.create(3, TimeUnit.SECONDS));
+                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
 
                     try {
                         Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
index b19fd3a..8fa3a17 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
 
 public class MockActorContext extends ActorContext {
 
@@ -33,12 +32,12 @@ public class MockActorContext extends ActorContext {
 
 
     @Override public Object executeShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeShardOperationResponse;
     }
 
     @Override public Object executeRemoteOperation(ActorSelection actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeRemoteOperationResponse;
     }
 
@@ -76,13 +75,13 @@ public class MockActorContext extends ActorContext {
 
     @Override
     public Object executeLocalOperation(ActorRef actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalOperationResponse;
     }
 
     @Override
     public Object executeLocalShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalShardOperationResponse;
     }
 }
index 939096e..4ddba2f 100644 (file)
@@ -21,8 +21,7 @@ public class TestUtils {
         ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
             Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index 41cdd59..8d454c4 100644 (file)
             <Export-package></Export-package>
             <Private-Package></Private-Package>
             <Import-Package>!org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;!*jetty*;!sun.security.*;*</Import-Package>
+            <!--
             <Embed-Dependency>
                 sal-clustering-commons;
                 sal-akka-raft;
                 *uncommons*;
             </Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
+          -->
           </instructions>
         </configuration>
       </plugin>
index f1ca3cc..6a442c5 100644 (file)
@@ -10,12 +10,16 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorSystem;
 import akka.osgi.BundleDelegatingClassLoader;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
 import org.osgi.framework.BundleContext;
 
 
 public class ActorSystemFactory {
- private static volatile ActorSystem actorSystem = null;
+
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
+    public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
+
+    private static volatile ActorSystem actorSystem = null;
 
   public static final ActorSystem getInstance(){
      return actorSystem;
@@ -26,7 +30,7 @@ public class ActorSystemFactory {
    *
    * @param bundleContext
    */
-  public static final void createInstance(final BundleContext bundleContext) {
+  public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
     if(actorSystem == null) {
       // Create an OSGi bundle classloader for actor system
       BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
@@ -34,8 +38,8 @@ public class ActorSystemFactory {
       synchronized (ActorSystemFactory.class) {
         // Double check
         if (actorSystem == null) {
-          ActorSystem system = ActorSystem.create("opendaylight-cluster-rpc",
-              ConfigFactory.load().getConfig("odl-cluster-rpc"), classLoader);
+          ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+              akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
           actorSystem = system;
         }
       }
@@ -43,4 +47,5 @@ public class ActorSystemFactory {
       throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
     }
   }
+
 }
index 4496bd3..7d7dbf0 100644 (file)
@@ -1,9 +1,13 @@
 package org.opendaylight.controller.remote.rpc;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
-import com.google.common.util.concurrent.Futures;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
@@ -13,73 +17,82 @@ import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
+
 import java.util.Collections;
 import java.util.Set;
 
-public class RemoteRpcImplementation implements RpcImplementation,
-    RoutedRpcDefaultImplementation {
-  private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
-  private ActorRef rpcBroker;
-  private SchemaContext schemaContext;
-
-  public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
-    this.rpcBroker = rpcBroker;
-    this.schemaContext = schemaContext;
-  }
-
-  @Override
-  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) {
-    InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
-
-    return executeMsg(rpcMsg);
-  }
-
-  @Override
-  public Set<QName> getSupportedRpcs() {
-    // TODO : check if we need to get this from routing registry
-    return Collections.emptySet();
-  }
-
-  @Override
-  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-    InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
-    return executeMsg(rpcMsg);
-  }
-
-  private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
-    ListenableFuture<RpcResult<CompositeNode>> listenableFuture = null;
-
-    try {
-      Object response = ActorUtil.executeOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
-      if(response instanceof RpcResponse) {
-
-        RpcResponse rpcResponse = (RpcResponse) response;
-        CompositeNode result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
-        listenableFuture = Futures.immediateFuture(RpcResultBuilder.success(result).build());
-
-      } else if(response instanceof ErrorResponse) {
-
-        ErrorResponse errorResponse = (ErrorResponse) response;
-        Exception e = errorResponse.getException();
-        final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
-        failed.withError(null, null, e.getMessage(), null, null, e.getCause());
-        listenableFuture = Futures.immediateFuture(failed.build());
-
-      }
-    } catch (Exception e) {
-      LOG.error("Error occurred while invoking RPC actor {}", e);
-
-      final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
-      failed.withError(null, null, e.getMessage(), null, null, e.getCause());
-      listenableFuture = Futures.immediateFuture(failed.build());
+public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+    private final ActorRe