Merge "Bug 2358: Fixed warnings in Restconf"
authorEd Warnicke <hagbard@gmail.com>
Thu, 16 Apr 2015 03:55:18 +0000 (03:55 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 16 Apr 2015 03:55:18 +0000 (03:55 +0000)
146 files changed:
features/netconf/src/main/resources/features.xml
karaf/karaf-parent/pom.xml
karaf/opendaylight-karaf-resources/src/main/resources/bin/instance [changed mode: 0755->0644]
karaf/opendaylight-karaf-resources/src/main/resources/bin/instance.bat
karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf
karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat
karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv [changed mode: 0755->0644]
karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat
karaf/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
karaf/opendaylight-karaf-resources/src/main/resources/etc/jre.properties
karaf/opendaylight-karaf/pom.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft-example/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ClientActor.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleActor.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java with 86% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/LogGenerator.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/Main.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/TestDriver.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/KeyValue.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java with 92% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/KeyValueSaved.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/PrintRole.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/PrintState.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/RegisterListener.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/messages/SetNotifiers.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java with 100% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationListenerAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/AbstractDataServiceTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/ConcurrentImplicitCreateTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/DeleteNestedAugmentationListenParentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentListenAugmentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentReadChildTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/BrokerIntegrationTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerMountPointTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/DOMRpcServiceTestBugfix560.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java
opendaylight/md-sal/sal-dummy-distributed-datastore/pom.xml
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/NetconfMappingTest.java
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java
opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSession.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java
opendaylight/netconf/netconf-util/pom.xml
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/OrderedNormalizedNodeWriter.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlElement.java

index dbd940f..80b2e36 100644 (file)
@@ -36,6 +36,7 @@
   <feature name='odl-netconf-util' version='${project.version}'>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <bundle>mvn:org.opendaylight.yangtools/yang-model-api/${yangtools.version}</bundle>
+    <bundle>mvn:org.opendaylight.yangtools/yang-data-api/${yangtools.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/netconf-util/${project.version}</bundle>
   </feature>
     <feature name='odl-netconf-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Impl">
index 2d914b3..958f2b8 100644 (file)
@@ -23,6 +23,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
   <properties>
     <branding.version>1.1.0-SNAPSHOT</branding.version>
     <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
+    <karaf.localFeature>standard</karaf.localFeature>
   </properties>
   <dependencyManagement>
     <dependencies>
@@ -47,6 +48,15 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
     </dependencies>
   </dependencyManagement>
   <dependencies>
+    <!-- karaf standard features -->
+    <dependency>
+      <groupId>org.apache.karaf.features</groupId>
+      <artifactId>standard</artifactId>
+      <version>${karaf.version}</version>
+      <type>xml</type>
+      <classifier>features</classifier>
+      <scope>runtime</scope>
+    </dependency>
     <!-- ODL Branding -->
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
old mode 100755 (executable)
new mode 100644 (file)
index 7288042..27772fd
@@ -85,7 +85,7 @@ unlimitFD() {
     # Increase the maximum file descriptors if we can
     if [ "$os400" = "false" ] && [ "$cygwin" = "false" ]; then
         MAX_FD_LIMIT=`ulimit -H -n`
-        if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then 
+        if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then
             if [ $? -eq 0 ]; then
                 if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ]; then
                     # use the system max
@@ -188,7 +188,7 @@ locateJava() {
     fi
 
     if [ "x$JAVA_HOME" = "x" ] && [ "$darwin" = "true" ]; then
-        JAVA_HOME="$(/usr/libexec/java_home)"
+        JAVA_HOME="$(/usr/libexec/java_home -v 1.7)"
     fi
     if [ "x$JAVA" = "x" ] && [ -r /etc/gentoo-release ] ; then
         JAVA_HOME=`java-config --jre-home`
@@ -202,7 +202,7 @@ locateJava() {
         else
             warn "JAVA_HOME not set; results may vary"
             JAVA=`type java`
-            JAVA=`expr "$JAVA" : '.*is \(.*\)$'`
+            JAVA=`expr "$JAVA" : '.* \(/.*\)$'`
             if [ "x$JAVA" = "x" ]; then
                 die "java command not found"
             fi
@@ -234,6 +234,10 @@ setupDebugOptions() {
     fi
     export JAVA_OPTS
 
+    if [ "x$EXTRA_JAVA_OPTS" != "x" ]; then
+        JAVA_OPTS="$JAVA_OPTS $EXTRA_JAVA_OPTS"
+    fi
+
     # Set Debug options if enabled
     if [ "x$KARAF_DEBUG" != "x" ]; then
         # Use the defaults if JAVA_DEBUG_OPTS was not set
@@ -280,7 +284,12 @@ setupDefaults() {
             CLASSPATH="$CLASSPATH:$file"
         fi
     done
-    DEFAULT_JAVA_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+
+    DEFAULT_JAVA_DEBUG_PORT="5005"
+    if [ "x$JAVA_DEBUG_PORT" = "x" ]; then
+        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
+    fi
+    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$JAVA_DEBUG_PORT"
 
     ##
     ## TODO: Move to conf/profiler/yourkit.{sh|cmd}
@@ -327,7 +336,7 @@ init() {
 
 run() {
 
-    CLASSPATH="${KARAF_HOME}/system/org/apache/karaf/instance/org.apache.karaf.instance.command/3.0.1/org.apache.karaf.instance.command-3.0.1.jar:${KARAF_HOME}/system/org/apache/karaf/instance/org.apache.karaf.instance.core/3.0.1/org.apache.karaf.instance.core-3.0.1.jar:${KARAF_HOME}/system/org/apache/karaf/shell/org.apache.karaf.shell.console/3.0.1/org.apache.karaf.shell.console-3.0.1.jar:${KARAF_HOME}/system/org/apache/karaf/shell/org.apache.karaf.shell.table/3.0.1/org.apache.karaf.shell.table-3.0.1.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.api/1.0.0/org.apache.aries.blueprint.api-1.0.0.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.core/1.4.0/org.apache.aries.blueprint.core-1.4.0.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.cm/1.0.3/org.apache.aries.blueprint.cm-1.0.3.jar:${KARAF_HOME}/system/org/ops4j/pax/logging/pax-logging-api/1.7.2/pax-logging-api-1.7.2.jar:${KARAF_HOME}/system/org/apache/felix/org.apache.felix.framework/4.2.1/org.apache.felix.framework-4.2.1.jar:${KARAF_HOME}/system/jline/jline/2.11/jline-2.11.jar:$CLASSPATH"
+    CLASSPATH="${KARAF_HOME}/system/org/apache/karaf/instance/org.apache.karaf.instance.command/3.0.3/org.apache.karaf.instance.command-3.0.3.jar:${KARAF_HOME}/system/org/apache/karaf/instance/org.apache.karaf.instance.core/3.0.3/org.apache.karaf.instance.core-3.0.3.jar:${KARAF_HOME}/system/org/apache/karaf/shell/org.apache.karaf.shell.console/3.0.3/org.apache.karaf.shell.console-3.0.3.jar:${KARAF_HOME}/system/org/apache/karaf/shell/org.apache.karaf.shell.table/3.0.3/org.apache.karaf.shell.table-3.0.3.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.api/1.0.1/org.apache.aries.blueprint.api-1.0.1.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.core/1.4.2/org.apache.aries.blueprint.core-1.4.2.jar:${KARAF_HOME}/system/org/apache/aries/blueprint/org.apache.aries.blueprint.cm/1.0.5/org.apache.aries.blueprint.cm-1.0.5.jar:${KARAF_HOME}/system/org/ops4j/pax/logging/pax-logging-api/1.8.1/pax-logging-api-1.8.1.jar:${KARAF_HOME}/system/org/apache/felix/org.apache.felix.framework/4.2.1/org.apache.felix.framework-4.2.1.jar:${KARAF_HOME}/system/jline/jline/2.12/jline-2.12.jar:$CLASSPATH"
 
     if $cygwin; then
         KARAF_HOME=`cygpath --path --windows "$KARAF_HOME"`
@@ -346,4 +355,3 @@ main() {
 }
 
 main "$@"
-
index 49c2c0f..2ac8db1 100644 (file)
@@ -95,7 +95,7 @@ if "%KARAF_ETC%" == "" (
 )\r
 \r
 set DEFAULT_JAVA_OPTS=\r
-set DEFAULT_JAVA_DEBUG_OPTS=-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005\r
+set DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\r
 \r
 rem Support for loading native libraries\r
 set PATH=%PATH%;%KARAF_BASE%\lib;%KARAF_HOME%\lib\r
@@ -113,11 +113,15 @@ if not "%JAVA%" == "" goto :Check_JAVA_END
 \r
 if "%JAVA_OPTS%" == "" set JAVA_OPTS=%DEFAULT_JAVA_OPTS%\r
 \r
+if "%EXTRA_JAVA_OPTS%" == "" goto :KARAF_EXTRA_JAVA_OPTS_END\r
+    set JAVA_OPTS="%JAVA_OPTS% %EXTRA_JAVA_OPTS%"\r
+:KARAF_EXTRA_JAVA_OPTS_END\r
+\r
 if "%KARAF_DEBUG%" == "" goto :KARAF_DEBUG_END\r
     rem Use the defaults if JAVA_DEBUG_OPTS was not set\r
     if "%JAVA_DEBUG_OPTS%" == "" set JAVA_DEBUG_OPTS=%DEFAULT_JAVA_DEBUG_OPTS%\r
 \r
-    set "JAVA_OPTS=%JAVA_DEBUG_OPTS% %JAVA_OPTS%"\r
+    set JAVA_OPTS="%JAVA_DEBUG_OPTS% %JAVA_OPTS%"\r
     call :warn Enabling Java debug options: %JAVA_DEBUG_OPTS%\r
 :KARAF_DEBUG_END\r
 \r
@@ -135,7 +139,7 @@ goto :EOF
 \r
 :CLASSPATH_END\r
 \r
-set CLASSPATH=%KARAF_HOME%\system\org\apache\karaf\instance\org.apache.karaf.instance.command\3.0.1\org.apache.karaf.instance.command-3.0.1.jar;%KARAF_HOME%\system\org\apache\karaf\instance\org.apache.karaf.instance.core\3.0.1\org.apache.karaf.instance.core-3.0.1.jar;%KARAF_HOME%\system\org\apache\karaf\shell\org.apache.karaf.shell.console\3.0.1\org.apache.karaf.shell.console-3.0.1.jar;%KARAF_HOME%\system\org\apache\karaf\shell\org.apache.karaf.shell.table\3.0.1\org.apache.karaf.shell.table-3.0.1.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.api\1.0.0\org.apache.aries.blueprint.api-1.0.0.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.core\1.4.0\org.apache.aries.blueprint.core-1.4.0.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.cm\1.0.3\org.apache.aries.blueprint.cm-1.0.3.jar;%KARAF_HOME%\system\org\ops4j\pax\logging\pax-logging-api\1.7.2\pax-logging-api-1.7.2.jar;%KARAF_HOME%\system\org\apache\felix\org.apache.felix.framework\4.2.1\org.apache.felix.framework-4.2.1.jar;%KARAF_HOME%\system\jline\jline\2.11\jline-2.11.jar;%CLASSPATH%\r
+set CLASSPATH=%KARAF_HOME%\system\org\apache\karaf\instance\org.apache.karaf.instance.command\3.0.3\org.apache.karaf.instance.command-3.0.3.jar;%KARAF_HOME%\system\org\apache\karaf\instance\org.apache.karaf.instance.core\3.0.3\org.apache.karaf.instance.core-3.0.3.jar;%KARAF_HOME%\system\org\apache\karaf\shell\org.apache.karaf.shell.console\3.0.3\org.apache.karaf.shell.console-3.0.3.jar;%KARAF_HOME%\system\org\apache\karaf\shell\org.apache.karaf.shell.table\3.0.3\org.apache.karaf.shell.table-3.0.3.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.api\1.0.1\org.apache.aries.blueprint.api-1.0.1.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.core\1.4.2\org.apache.aries.blueprint.core-1.4.2.jar;%KARAF_HOME%\system\org\apache\aries\blueprint\org.apache.aries.blueprint.cm\1.0.5\org.apache.aries.blueprint.cm-1.0.5.jar;%KARAF_HOME%\system\org\ops4j\pax\logging\pax-logging-api\1.8.1\pax-logging-api-1.8.1.jar;%KARAF_HOME%\system\org\apache\felix\org.apache.felix.framework\4.2.1\org.apache.felix.framework-4.2.1.jar;%KARAF_HOME%\system\jline\jline\2.12\jline-2.12.jar;%CLASSPATH%\r
 \r
 :EXECUTE\r
     if "%SHIFT%" == "true" SET ARGS=%2 %3 %4 %5 %6 %7 %8\r
@@ -148,4 +152,3 @@ rem # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
 :END\r
 \r
 endlocal\r
-\r
index cad052a..23fbbec 100755 (executable)
@@ -16,8 +16,8 @@
 #    limitations under the License.
 #
 
-DIRNAME=`dirname $0`
-PROGNAME=`basename $0`
+DIRNAME=`dirname "$0"`
+PROGNAME=`basename "$0"`
 
 #
 # Sourcing environment settings for karaf similar to tomcats setenv
@@ -92,7 +92,7 @@ unlimitFD() {
     # Increase the maximum file descriptors if we can
     if [ "$os400" = "false" ] && [ "$cygwin" = "false" ]; then
         MAX_FD_LIMIT=`ulimit -H -n`
-        if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then 
+        if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then
             if [ $? -eq 0 ]; then
                 if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ]; then
                     # use the system max
@@ -195,7 +195,7 @@ locateJava() {
     fi
 
        if [ "x$JAVA_HOME" = "x" ] && [ "$darwin" = "true" ]; then
-               JAVA_HOME="$(/usr/libexec/java_home)"
+               JAVA_HOME="$(/usr/libexec/java_home -v 1.7)"
        fi
     if [ "x$JAVA" = "x" ] && [ -r /etc/gentoo-release ] ; then
         JAVA_HOME=`java-config --jre-home`
@@ -209,7 +209,7 @@ locateJava() {
         else
             warn "JAVA_HOME not set; results may vary"
             JAVA=`type java`
-            JAVA=`expr "$JAVA" : '.*is \(.*\)$'`
+            JAVA=`expr "$JAVA" : '.* \(/.*\)$'`
             if [ "x$JAVA" = "x" ]; then
                 die "java command not found"
             fi
@@ -237,7 +237,7 @@ detectJVM() {
 
 checkJvmVersion() {
    # echo "`$JAVA -version`"
-   VERSION=`$JAVA -version 2>&1 | egrep '"([0-9].[0-9]\..*[0-9])"' | awk '{print substr($3,2,length($3)-2)}' | awk '{print substr($1, 3, 3)}' | sed -e 's;\.;;g'`
+   VERSION=`$JAVA -version 2>&1 | egrep '"([0-9].[0-9]\..*[0-9]).*"' | awk '{print substr($3,2,length($3)-2)}' | awk '{print substr($1, 3, 3)}' | sed -e 's;\.;;g'`
    # echo $VERSION
    if [ "$VERSION" -lt "60" ]; then
        echo "JVM must be greater than 1.6"
@@ -251,6 +251,10 @@ setupDebugOptions() {
     fi
     export JAVA_OPTS
 
+    if [ "x$EXTRA_JAVA_OPTS" != "x" ]; then
+        JAVA_OPTS="$JAVA_OPTS $EXTRA_JAVA_OPTS"
+    fi
+
     # Set Debug options if enabled
     if [ "x$KARAF_DEBUG" != "x" ]; then
         # Ignore DEBUG in case of stop or client mode
@@ -304,7 +308,12 @@ setupDefaults() {
             CLASSPATH="$CLASSPATH:$file"
         fi
     done
-    DEFAULT_JAVA_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+
+    DEFAULT_JAVA_DEBUG_PORT="5005"
+    if [ "x$JAVA_DEBUG_PORT" = "x" ]; then
+        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
+    fi
+    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$JAVA_DEBUG_PORT"
 
     ##
     ## TODO: Move to conf/profiler/yourkit.{sh|cmd}
@@ -340,7 +349,7 @@ init() {
 
     # Determine the JVM vendor
     detectJVM
-    
+
     # Determine the JVM version >= 1.6
     checkJvmVersion
 
index a450877..9c278c3 100644 (file)
@@ -96,26 +96,9 @@ if "%KARAF_ETC%" == "" (
 \r
 set LOCAL_CLASSPATH=%CLASSPATH%\r
 set JAVA_MODE=-server\r
-if not exist "%JAVA_HOME%\bin\server\jvm.dll" (\r
-    if not exist "%JAVA_HOME%\jre\bin\server\jvm.dll" (\r
-        echo WARNING: Running karaf on a Java HotSpot Client VM because server-mode is not available.\r
-        echo Install Java Developer Kit to fix this.\r
-        echo For more details see http://java.sun.com/products/hotspot/whitepaper.html#client\r
-        set JAVA_MODE=-client\r
-    )\r
-)\r
-set DEFAULT_JAVA_OPTS=%JAVA_MODE% -Xms%JAVA_MIN_MEM% -Xmx%JAVA_MAX_MEM% -Dderby.system.home="%KARAF_DATA%\derby" -Dderby.storage.fileSyncTransactionLog=true -Dcom.sun.management.jmxremote  -XX:+UnlockDiagnosticVMOptions -XX:+UnsyncloadClass\r
-\r
-rem Check some easily accessible MIN/MAX params for JVM mem usage\r
-if not "%JAVA_PERM_MEM%" == "" (\r
-    set DEFAULT_JAVA_OPTS=%DEFAULT_JAVA_OPTS% -XX:PermSize=%JAVA_PERM_MEM%\r
-)\r
-if not "%JAVA_MAX_PERM_MEM%" == "" (\r
-    set DEFAULT_JAVA_OPTS=%DEFAULT_JAVA_OPTS% -XX:MaxPermSize=%JAVA_MAX_PERM_MEM%\r
-)\r
 \r
 set CLASSPATH=%LOCAL_CLASSPATH%;%KARAF_BASE%\conf\r
-set DEFAULT_JAVA_DEBUG_OPTS=-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005\r
+set DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\r
 \r
 if "%LOCAL_CLASSPATH%" == "" goto :KARAF_CLASSPATH_EMPTY\r
     set CLASSPATH=%LOCAL_CLASSPATH%;%KARAF_BASE%\conf\r
@@ -206,8 +189,8 @@ if not "%JAVA%" == "" goto :Check_JAVA_END
     )\r
     if not exist "%JAVA_HOME%" (\r
        goto TryRegJDK\r
-    )\r
-       goto TryJDKEnd\r
+       )\r
+       goto TryJDKEnd\r
 :TryRegJDK\r
     rem try getting the JAVA_HOME from registry\r
     FOR /F "usebackq tokens=3*" %%A IN (`REG QUERY "HKLM\Software\JavaSoft\Java Development Kit" /v CurrentVersion`) DO (\r
@@ -219,7 +202,7 @@ if not "%JAVA%" == "" goto :Check_JAVA_END
     if not exist "%JAVA_HOME%" (\r
        call :warn Unable to retrieve JAVA_HOME from Registry\r
     )\r
-        goto TryJDKEnd\r
+       goto TryJDKEnd\r
 :TryJDKEnd\r
     if not exist "%JAVA_HOME%" (\r
         call :warn JAVA_HOME is not valid: "%JAVA_HOME%"\r
@@ -228,15 +211,37 @@ if not "%JAVA%" == "" goto :Check_JAVA_END
     set JAVA=%JAVA_HOME%\bin\java\r
 :Check_JAVA_END\r
 \r
+if not exist "%JAVA_HOME%\bin\server\jvm.dll" (\r
+    if not exist "%JAVA_HOME%\jre\bin\server\jvm.dll" (\r
+        echo WARNING: Running Karaf on a Java HotSpot Client VM because server-mode is not available.\r
+        echo Install Java Developer Kit to fix this.\r
+        echo For more details see http://java.sun.com/products/hotspot/whitepaper.html#client\r
+        set JAVA_MODE=-client\r
+    )\r
+)\r
+set DEFAULT_JAVA_OPTS=%JAVA_MODE% -Xms%JAVA_MIN_MEM% -Xmx%JAVA_MAX_MEM% -Dderby.system.home="%KARAF_DATA%\derby" -Dderby.storage.fileSyncTransactionLog=true -Dcom.sun.management.jmxremote  -XX:+UnlockDiagnosticVMOptions -XX:+UnsyncloadClass\r
+\r
+rem Check some easily accessible MIN/MAX params for JVM mem usage\r
+if not "%JAVA_PERM_MEM%" == "" (\r
+    set DEFAULT_JAVA_OPTS=%DEFAULT_JAVA_OPTS% -XX:PermSize=%JAVA_PERM_MEM%\r
+)\r
+if not "%JAVA_MAX_PERM_MEM%" == "" (\r
+    set DEFAULT_JAVA_OPTS=%DEFAULT_JAVA_OPTS% -XX:MaxPermSize=%JAVA_MAX_PERM_MEM%\r
+)\r
+\r
 if "%JAVA_OPTS%" == "" set JAVA_OPTS=%DEFAULT_JAVA_OPTS%\r
 \r
+if "%EXTRA_JAVA_OPTS%" == "" goto :KARAF_EXTRA_JAVA_OPTS_END\r
+    set JAVA_OPTS=%JAVA_OPTS% %EXTRA_JAVA_OPTS%\r
+:KARAF_EXTRA_JAVA_OPTS_END\r
+\r
 if "%KARAF_DEBUG%" == "" goto :KARAF_DEBUG_END\r
     if "%1" == "stop" goto :KARAF_DEBUG_END\r
     if "%1" == "client" goto :KARAF_DEBUG_END\r
     rem Use the defaults if JAVA_DEBUG_OPTS was not set\r
     if "%JAVA_DEBUG_OPTS%" == "" set JAVA_DEBUG_OPTS=%DEFAULT_JAVA_DEBUG_OPTS%\r
 \r
-    set "JAVA_OPTS=%JAVA_DEBUG_OPTS% %JAVA_OPTS%"\r
+    set JAVA_OPTS=%JAVA_DEBUG_OPTS% %JAVA_OPTS%\r
     call :warn Enabling Java debug options: %JAVA_DEBUG_OPTS%\r
 :KARAF_DEBUG_END\r
 \r
@@ -314,7 +319,7 @@ if "%KARAF_PROFILER%" == "" goto :RUN
 \r
 :EXECUTE_DEBUG\r
     if "%JAVA_DEBUG_OPTS%" == "" set JAVA_DEBUG_OPTS=%DEFAULT_JAVA_DEBUG_OPTS%\r
-    set "JAVA_OPTS=%JAVA_DEBUG_OPTS% %JAVA_OPTS%"\r
+    set JAVA_OPTS=%JAVA_DEBUG_OPTS% %JAVA_OPTS%\r
     shift\r
     goto :RUN_LOOP\r
 \r
@@ -333,4 +338,3 @@ endlocal
 if not "%PAUSE%" == "" pause\r
 \r
 :END_NO_PAUSE\r
-\r
old mode 100755 (executable)
new mode 100644 (file)
index 947c65f..42330fa
@@ -32,7 +32,7 @@
 #
 
 #
-# The following section shows the possible configuration options for the default 
+# The following section shows the possible configuration options for the default
 # karaf scripts
 #
 # export JAVA_HOME # Location of Java installation
 # export JAVA_MAX_MEM # Maximum memory for the JVM
 # export JAVA_PERM_MEM # Minimum perm memory for the JVM
 # export JAVA_MAX_PERM_MEM # Maximum perm memory for the JVM
+# export EXTRA_JAVA_OPTS # Additional JVM options
 # export KARAF_HOME # Karaf home folder
 # export KARAF_DATA # Karaf data folder
 # export KARAF_BASE # Karaf base folder
 # export KARAF_ETC  # Karaf etc  folder
 # export KARAF_OPTS # Additional available Karaf options
 # export KARAF_DEBUG # Enable debug mode
+# export KARAF_REDIRECT # Enable/set the std/err redirection when using bin/start
 if [ "x$JAVA_MAX_PERM_MEM" = "x" ]; then
     export JAVA_MAX_PERM_MEM="512m"
 fi
 if [ "x$JAVA_MAX_MEM" = "x" ]; then
     export JAVA_MAX_MEM="2048m"
 fi
-
index 7c61920..66a25a3 100644 (file)
@@ -48,6 +48,8 @@ rem Minimum perm memory for the JVM
 rem SET JAVA_PERM_MEM
 rem Maximum perm memory for the JVM
 rem SET JAVA_MAX_PERM_MEM
+rem Additional JVM options
+rem SET EXTRA_JAVA_OPTS
 rem Karaf home folder
 rem SET KARAF_HOME
 rem Karaf data folder
index 4a8f5ae..e726b80 100644 (file)
@@ -132,8 +132,14 @@ java.util.logging.config.file=configuration/tomcat-logging.properties
 hosttracker.keyscheme=IP
 
 # LISP Flow Mapping configuration
-# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings
+# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings (default: true)
 lisp.mappingOverwrite = true
-# Enable the Solicit-Map-Request (SMR) mechanism
-lisp.smr = false
+# Enable the Solicit-Map-Request (SMR) mechanism (default: true)
+lisp.smr = true
+# Choose policy for Explicit Locator Path (ELP) handling
+# There are three options:
+#   default: don't add or remove locator records, return mapping as-is
+#   both: keep the ELP, but add the next hop as a standalone non-LCAF locator with a lower priority
+#   replace: remove the ELP, add the next hop as a standalone non-LCAF locator
+lisp.elpPolicy = default
 
index f32078a..bfdef62 100644 (file)
@@ -27,8 +27,8 @@ jre-1.6= \
  javax.accessibility, \
  javax.activation;version="1.1", \
  javax.activity, \
- javax.annotation;version="1.1", \
- javax.annotation.processing;version="1.1", \
+ javax.annotation;version="1.0", \
+ javax.annotation.processing;version="1.6", \
  javax.crypto, \
  javax.crypto.interfaces, \
  javax.crypto.spec, \
@@ -182,7 +182,7 @@ jre-1.6= \
  org.w3c.dom.xpath, \
  org.xml.sax, \
  org.xml.sax.ext, \
- org.xml.sax.helpers,  \
+ org.xml.sax.helpers, \
  sun.misc
 
 # Standard package set.  Note that:
@@ -191,8 +191,8 @@ jre-1.7= \
  javax.accessibility, \
  javax.activation;version="1.1", \
  javax.activity, \
- javax.annotation;version="1.2", \
- javax.annotation.processing;version="1.2", \
+ javax.annotation;version="1.0", \
+ javax.annotation.processing;version="1.6", \
  javax.crypto, \
  javax.crypto.interfaces, \
  javax.crypto.spec, \
@@ -346,15 +346,15 @@ jre-1.7= \
  org.w3c.dom.xpath, \
  org.xml.sax, \
  org.xml.sax.ext, \
- org.xml.sax.helpers,  \
+ org.xml.sax.helpers, \
  sun.misc
 
 jre-1.8= \
  javax.accessibility, \
  javax.activation;version="1.1", \
  javax.activity, \
- javax.annotation;version="1.2", \
- javax.annotation.processing;version="1.2", \
+ javax.annotation;version="1.0", \
+ javax.annotation.processing;version="1.6", \
  javax.crypto, \
  javax.crypto.interfaces, \
  javax.crypto.spec, \
@@ -465,6 +465,39 @@ jre-1.8= \
  javax.xml.ws.wsaddressing;version="2.2", \
  javax.xml.ws.spi.http;version="2.2", \
  javax.xml.xpath, \
+ javafx.animation, \
+ javafx.application, \
+ javafx.beans, \
+ javafx.beans.binding, \
+ javafx.beans.property, \
+ javafx.beans.property.adapter, \
+ javafx.beans.value, \
+ javafx.collections, \
+ javafx.concurrent, \
+ javafx.css, \
+ javafx.embed.swing, \
+ javafx.embed.swt, \
+ javafx.event, \
+ javafx.fxml, \
+ javafx.geometry, \
+ javafx.scene, \
+ javafx.scene.canvas, \
+ javafx.scene.chart, \
+ javafx.scene.control, \
+ javafx.scene.control.cell, \
+ javafx.scene.effect, \
+ javafx.scene.image, \
+ javafx.scene.input, \
+ javafx.scene.layout, \
+ javafx.scene.media, \
+ javafx.scene.paint, \
+ javafx.scene.shape, \
+ javafx.scene.text, \
+ javafx.scene.transform, \
+ javafx.scene.web, \
+ javafx.stage, \
+ javafx.util, \
+ javafx.util.converter, \
  org.ietf.jgss, \
  org.omg.CORBA, \
  org.omg.CORBA_2_3, \
@@ -508,5 +541,5 @@ jre-1.8= \
  org.w3c.dom.xpath, \
  org.xml.sax, \
  org.xml.sax.ext, \
- org.xml.sax.helpers,  \
+ org.xml.sax.helpers, \
  sun.misc
index 9c02a9d..68238bb 100644 (file)
           <type>xml</type>
           <scope>runtime</scope>
       </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>features-flow</artifactId>
-      <classifier>features</classifier>
-      <type>xml</type>
-      <scope>runtime</scope>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>features-restconf</artifactId>
index c115dcd..d0b9f48 100644 (file)
                 <artifactId>sal-remoterpc-connector</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>sal-akka-raft</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>sal-akka-raft</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>sal-akka-raft-example</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <!-- OpenFlow stuff -->
             <dependency>
                 <artifactId>model-flow-statistics</artifactId>
                 <version>${project.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.opendaylight.controller</groupId>
-                <artifactId>features-flow</artifactId>
-                <version>${project.version}</version>
-                <classifier>features</classifier>
-                <type>xml</type>
-                <scope>runtime</scope>
-            </dependency>
 
             <!-- RESTCONF -->
             <dependency>
index 85d1a1b..e26502f 100644 (file)
@@ -7,17 +7,37 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.JmxAttribute;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.osgi.framework.BundleContext;
 
+import javax.management.ObjectName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 public class MessageBusAppImplModuleTest {
 
     MessageBusAppImplModule messageBusAppImplModule;
@@ -55,5 +75,34 @@ public class MessageBusAppImplModuleTest {
         assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
     }
 
-    //TODO: create MessageBusAppImplModule.createInstance test
-}
+    @Test
+    public void createInstanceTest() throws Exception{
+        org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingAwareBrokerMock = mock(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class);
+        Broker brokerMock = mock(Broker.class);
+        doReturn(brokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.core.api.Broker.class), any(ObjectName.class), any(JmxAttribute.class));
+        doReturn(bindingAwareBrokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class), any(ObjectName.class), any(JmxAttribute.class));
+        messageBusAppImplModule.resolveDependencies();
+
+        BindingAwareBroker.ProviderContext providerContext = mock(BindingAwareBroker.ProviderContext.class);
+        doReturn(providerContext).when(bindingAwareBrokerMock).registerProvider(any(BindingAwareProvider.class));
+        Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
+        doReturn(providerSessionMock).when(brokerMock).registerProvider(any(Provider.class));
+        DataBroker dataBrokerMock = mock(DataBroker.class);
+        doReturn(dataBrokerMock).when(providerContext).getSALService(eq(DataBroker.class));
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        doReturn(domNotificationPublishServiceMock).when(providerSessionMock).getService(DOMNotificationPublishService.class);
+        DOMMountPointService domMountPointServiceMock = mock(DOMMountPointService.class);
+        doReturn(domMountPointServiceMock).when(providerSessionMock).getService(DOMMountPointService.class);
+        MountPointService mountPointServiceMock = mock(MountPointService.class);
+        doReturn(mountPointServiceMock).when(providerContext).getSALService(eq(MountPointService.class));
+        RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        doReturn(rpcProviderRegistryMock).when(providerContext).getSALService(eq(RpcProviderRegistry.class));
+
+        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true));
+
+        assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java
new file mode 100644 (file)
index 0000000..9cce623
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class EventSourceRegistrationImplTest {
+
+    EventSourceRegistrationImplLocal eventSourceRegistrationImplLocal;
+    EventSourceTopology eventSourceTopologyMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        EventSource eventSourceMock = mock(EventSource.class);
+        eventSourceTopologyMock = mock(EventSourceTopology.class);
+        eventSourceRegistrationImplLocal = new EventSourceRegistrationImplLocal(eventSourceMock, eventSourceTopologyMock);
+    }
+
+    @Test
+    public void removeRegistrationTest() {
+        eventSourceRegistrationImplLocal.removeRegistration();
+        verify(eventSourceTopologyMock, times(1)).unRegister(any(EventSource.class));
+    }
+
+
+    private class EventSourceRegistrationImplLocal extends EventSourceRegistrationImpl{
+
+        /**
+         * @param instance            of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, org.opendaylight.controller.messagebus.spi.EventSource)}
+         * @param eventSourceTopology
+         */
+        public EventSourceRegistrationImplLocal(EventSource instance, EventSourceTopology eventSourceTopology) {
+            super(instance, eventSourceTopology);
+        }
+    }
+
+}
\ No newline at end of file
index f369a12..9f513c4 100644 (file)
@@ -74,7 +74,7 @@ public class EventSourceTopicTest {
 
         nodeIdMock = mock(NodeId.class);
         doReturn(nodeIdMock).when(dataObjectMock).getId();
-        doReturn("0").when(nodeIdMock).getValue();
+        doReturn("nodeIdPattern1").when(nodeIdMock).getValue();
     }
 
     @Test
@@ -84,4 +84,4 @@ public class EventSourceTopicTest {
         verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
     }
 
-}
+}
\ No newline at end of file
index ced2e1f..50ae4d9 100644 (file)
@@ -16,13 +16,16 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -57,6 +60,7 @@ public class EventSourceTopologyTest {
     CreateTopicInput createTopicInputMock;
     ListenerRegistration listenerRegistrationMock;
     NodeKey nodeKey;
+    RpcRegistration<EventAggregatorService> aggregatorRpcReg;
 
     @BeforeClass
     public static void initTestClass() throws IllegalAccessException, InstantiationException {
@@ -76,7 +80,7 @@ public class EventSourceTopologyTest {
     }
 
     private void constructorTestHelper(){
-        RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+        aggregatorRpcReg = mock(RpcRegistration.class);
         EventSourceService eventSourceService = mock(EventSourceService.class);
         doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
         doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
@@ -87,11 +91,11 @@ public class EventSourceTopologyTest {
         doReturn(checkedFutureMock).when(writeTransactionMock).submit();
     }
 
-//TODO: create test for createTopic
-//    public void createTopicTest() throws Exception{
-//        createTopicTestHelper();
-//        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
-//    }
+    @Test
+    public void createTopicTest() throws Exception{
+        topicTestHelper();
+        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+    }
 
     private void topicTestHelper() throws Exception{
         constructorTestHelper();
@@ -138,6 +142,19 @@ public class EventSourceTopologyTest {
         assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
     }
 
+    @Test
+    public void closeTest() throws Exception{
+        constructorTestHelper();
+        topicTestHelper();
+        Map<DataChangeListener, ListenerRegistration<DataChangeListener>> localMap = getTopicListenerRegistrations();
+        DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class);
+        ListenerRegistration<DataChangeListener> listenerListenerRegistrationMock = (ListenerRegistration<DataChangeListener>) mock(ListenerRegistration.class);
+        localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock);
+        eventSourceTopology.close();
+        verify(aggregatorRpcReg, times(1)).close();
+        verify(listenerListenerRegistrationMock, times(1)).close();
+    }
+
     @Test
     public void registerTest() throws Exception {
         topicTestHelper();
@@ -154,4 +171,46 @@ public class EventSourceTopologyTest {
         verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
     }
 
-}
+    @Test
+    public void unregisterTest() throws Exception {
+        topicTestHelper();
+        EventSource eventSourceMock = mock(EventSource.class);
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        nodeKey = new NodeKey(nodeId);
+        Map<NodeKey, BindingAwareBroker.RoutedRpcRegistration<EventSourceService>> localMap = getRoutedRpcRegistrations();
+        NodeKey nodeKeyMock = mock(NodeKey.class);
+        doReturn(nodeKeyMock).when(eventSourceMock).getSourceNodeKey();
+        BindingAwareBroker.RoutedRpcRegistration<EventSourceService> routedRpcRegistrationMock = (BindingAwareBroker.RoutedRpcRegistration<EventSourceService>) mock(BindingAwareBroker.RoutedRpcRegistration.class);
+        localMap.put(nodeKeyMock, routedRpcRegistrationMock);
+        eventSourceTopology.unRegister(eventSourceMock);
+        verify(routedRpcRegistrationMock, times(1)).close();
+    }
+
+    @Test
+    public void registerEventSourceTest() throws Exception {
+        topicTestHelper();
+        Node nodeMock = mock(Node.class);
+        EventSource eventSourceMock = mock(EventSource.class);
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        nodeKey = new NodeKey(nodeId);
+        doReturn(nodeKey).when(nodeMock).getKey();
+        doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
+        BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
+        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+        doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+        assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock));
+    }
+
+    private Map getTopicListenerRegistrations() throws Exception{
+        Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations");
+        nesField.setAccessible(true);
+        return (Map) nesField.get(eventSourceTopology);
+    }
+
+    private Map getRoutedRpcRegistrations() throws Exception{
+        Field nesField = EventSourceTopology.class.getDeclaredField("routedRpcRegistrations");
+        nesField.setAccessible(true);
+        return (Map) nesField.get(eventSourceTopology);
+    }
+
+}
\ No newline at end of file
index 61fa30f..1d6b825 100644 (file)
@@ -84,11 +84,11 @@ public class NetconfEventSourceManagerTest {
 
         netconfEventSourceManager =
                 NetconfEventSourceManager.create(dataBrokerMock,
-                                                 domNotificationPublishServiceMock,
-                                                 domMountPointServiceMock,
-                                                 mountPointServiceMock,
-                                                 eventSourceRegistry,
-                                                 namespaceToStreamList);
+                        domNotificationPublishServiceMock,
+                        domMountPointServiceMock,
+                        mountPointServiceMock,
+                        eventSourceRegistry,
+                        namespaceToStreamList);
     }
 
     @Test
@@ -125,12 +125,14 @@ public class NetconfEventSourceManagerTest {
         Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
         InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
         Node dataObjectMock = mock(Node.class);
+
         if(create){
             mapCreate.put(instanceIdentifierMock, dataObjectMock);
         }
         if(update){
             mapUpdate.put(instanceIdentifierMock, dataObjectMock);
         }
+
         doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
         doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
         NetconfNode netconfNodeMock = mock(NetconfNode.class);
@@ -171,4 +173,4 @@ public class NetconfEventSourceManagerTest {
         doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
     }
 
-}
+}
\ No newline at end of file
index 58da9e3..ed90257 100644 (file)
@@ -7,22 +7,8 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.BindingService;
@@ -53,21 +39,32 @@ import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class NetconfEventSourceTest {
 
     NetconfEventSource netconfEventSource;
     DOMMountPoint domMountPointMock;
     JoinTopicInput joinTopicInputMock;
-    AsyncDataChangeEvent asyncDataChangeEventMock;
-    Node dataObjectMock;
 
     @Before
     public void setUp() throws Exception {
         Map<String, String> streamMap = new HashMap<>();
-        streamMap.put("string1", "string2");
+        streamMap.put("uriStr1", "string2");
         domMountPointMock = mock(DOMMountPoint.class);
         DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
         MountPoint mountPointMock = mock(MountPoint.class);
@@ -80,9 +77,9 @@ public class NetconfEventSourceTest {
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
         org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
-            = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+                = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
         org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
-            = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+                = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
         doReturn(nodeId).when(node).getNodeId();
         netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
     }
@@ -143,7 +140,7 @@ public class NetconfEventSourceTest {
         doReturn(topicId).when(joinTopicInputMock).getTopicId();
         NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
         doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-        doReturn("regexString1").when(notificationPatternMock).getValue();
+        doReturn("uriStr1").when(notificationPatternMock).getValue();
 
         SchemaContext schemaContextMock = mock(SchemaContext.class);
         doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
@@ -165,6 +162,13 @@ public class NetconfEventSourceTest {
         doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
         ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
         doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+
+        Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+        doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+        DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+        doReturn(domRpcServiceMock).when(optionalMock).get();
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
     }
 
 //TODO: create Test for NetConfEventSource#onNotification
@@ -175,4 +179,4 @@ public class NetconfEventSourceTest {
         return (Set) nesField.get(netconfEventSource);
     }
 
-}
+}
\ No newline at end of file
index 6dacb97..b3f6438 100644 (file)
@@ -57,4 +57,4 @@ public class TopicDOMNotificationTest {
         String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
         assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
     }
-}
+}
\ No newline at end of file
index c058765..bf0a082 100644 (file)
@@ -56,6 +56,7 @@
     <module>sal-rest-docgen-maven</module>
 
     <module>sal-akka-raft</module>
+    <module>sal-akka-raft-example</module>
 
     <!--InMemory DOM DataStore-->
     <module>sal-inmemory-datastore</module>
diff --git a/opendaylight/md-sal/sal-akka-raft-example/pom.xml b/opendaylight/md-sal/sal-akka-raft-example/pom.xml
new file mode 100644 (file)
index 0000000..46fae55
--- /dev/null
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>sal-akka-raft-example</artifactId>
+  <packaging>bundle</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-akka-raft</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+          </instructions>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <tag>HEAD</tag>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+  </scm>
+</project>
@@ -19,6 +19,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -26,6 +27,8 @@ import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -34,9 +37,9 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
-    private final Map<String, String> state = new HashMap();
+    private final Map<String, String> state = new HashMap<>();
 
     private long persistIdentifier = 1;
     private final Optional<ActorRef> roleChangeNotifier;
@@ -118,7 +121,8 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
         ByteString bs = null;
         try {
             bs = fromObject(state);
@@ -128,15 +132,16 @@ public class ExampleActor extends RaftActor {
         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(byte [] snapshot) {
+    @Override
+    public void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
-            state.putAll((HashMap) toObject(snapshot));
+            state.putAll((HashMap<String, String>) toObject(snapshot));
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
         }
     }
 
@@ -192,22 +197,33 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(int maxBatchSize) {
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
+    }
+
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    public void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    public void applyCurrentLogRecoveryBatch() {
     }
 
     @Override
-    protected void applyCurrentLogRecoveryBatch() {
+    public void onRecoveryComplete() {
     }
 
     @Override
-    protected void onRecoveryComplete() {
+    public void applyRecoverySnapshot(byte[] snapshot) {
     }
 
     @Override
-    protected void applyRecoverySnapshot(byte[] snapshot) {
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
     }
 }
@@ -53,8 +53,8 @@ public class KeyValue extends Payload implements Serializable {
     }
 
     // override this method to return  the protobuff related extension fields and their values
-    @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
-        Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<>();
+    @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+        Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
         map.put(KeyValueMessages.key, getKey());
         map.put(KeyValueMessages.value, getValue());
         return map;
index 1aecc89..b4b2afb 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -19,7 +20,7 @@ import java.util.List;
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     // We define this as ArrayList so we can use ensureCapacity.
-    protected ArrayList<ReplicatedLogEntry> journal;
+    private ArrayList<ReplicatedLogEntry> journal;
 
     private long snapshotIndex = -1;
     private long snapshotTerm = -1;
@@ -28,13 +29,17 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
     private long previousSnapshotIndex = -1;
     private long previousSnapshotTerm = -1;
-    protected int dataSize = 0;
+    private int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.journal = new ArrayList<>(unAppliedEntries);
+
+        for(ReplicatedLogEntry entry: journal) {
+            dataSize += entry.size();
+        }
     }
 
     public AbstractReplicatedLogImpl() {
@@ -90,18 +95,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     @Override
-    public void removeFrom(long logEntryIndex) {
+    public long removeFrom(long logEntryIndex) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
             // physical index should be less than list size and >= 0
-            return;
+            return -1;
+        }
+
+        for(int i = adjustedIndex; i < journal.size(); i++) {
+            dataSize -= journal.get(i).size();
         }
+
         journal.subList(adjustedIndex , journal.size()).clear();
+
+        return adjustedIndex;
     }
 
     @Override
     public void append(ReplicatedLogEntry replicatedLogEntry) {
         journal.add(replicatedLogEntry);
+        dataSize += replicatedLogEntry.size();
     }
 
     @Override
@@ -196,7 +209,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
 
         snapshottedJournal.addAll(snapshotJournalEntries);
-        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+        snapshotJournalEntries.clear();
 
         previousSnapshotIndex = snapshotIndex;
         setSnapshotIndex(snapshotCapturedIndex);
@@ -230,4 +243,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshotTerm = previousSnapshotTerm;
         previousSnapshotTerm = -1;
     }
+
+    @VisibleForTesting
+    ReplicatedLogEntry getAtPhysicalIndex(int index) {
+        return journal.get(index);
+    }
 }
index 1c30fe2..157a53e 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -11,15 +12,10 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
@@ -27,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
@@ -36,11 +33,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
@@ -99,8 +92,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -117,11 +108,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
 
-    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
-    private Stopwatch recoveryTimer;
+    private RaftActorRecoverySupport raftRecovery;
 
-    private int currentRecoveryBatchCount;
+    private RaftActorSnapshotMessageSupport snapshotSupport;
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
@@ -135,17 +124,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
             -1, -1, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG);
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            delegatingPersistenceProvider, LOG);
 
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
-    private void initRecoveryTimer() {
-        if(recoveryTimer == null) {
-            recoveryTimer = Stopwatch.createStarted();
-        }
-    }
-
     @Override
     public void preStart() throws Exception {
         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@@ -156,7 +140,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
+        if(currentBehavior.getDelegate() != null) {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
@@ -169,134 +153,32 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleRecover(Object message) {
-        if(persistence().isRecoveryApplicable()) {
-            if (message instanceof SnapshotOffer) {
-                onRecoveredSnapshot((SnapshotOffer) message);
-            } else if (message instanceof ReplicatedLogEntry) {
-                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-            } else if (message instanceof ApplyLogEntries) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
-            } else if (message instanceof ApplyJournalEntries) {
-                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-            } else if (message instanceof DeleteEntries) {
-                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
-            } else if (message instanceof RecoveryCompleted) {
-                onRecoveryCompletedMessage();
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
+        if(raftRecovery == null) {
+            raftRecovery = newRaftActorRecoverySupport();
+        }
+
+        boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+        if(recoveryComplete) {
+            if(!persistence().isRecoveryApplicable()) {
                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
                 deleteMessages(lastSequenceNr());
 
                 // Delete all the akka snapshots as they will not be needed
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
-                onRecoveryComplete();
-
-                initializeBehavior();
             }
-        }
-    }
-
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: SnapshotOffer called..", persistenceId());
-        }
-
-        initRecoveryTimer();
-
-        Snapshot snapshot = (Snapshot) offer.snapshot();
-
-        // Create a replicated log with the snapshot information
-        // The replicated log can be used later on to retrieve this snapshot
-        // when we need to install it on a peer
-
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                currentBehavior));
-        context.setLastApplied(snapshot.getLastAppliedIndex());
-        context.setCommitIndex(snapshot.getLastAppliedIndex());
-
-        Stopwatch timer = Stopwatch.createStarted();
-
-        // Apply the snapshot to the actors state
-        applyRecoverySnapshot(snapshot.getState());
-
-        timer.stop();
-        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog().size(), persistenceId(), timer.toString(),
-                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
-    }
 
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
-        }
-
-        replicatedLog().append(logEntry);
-    }
-
-    private void onRecoveredApplyLogEntries(long toIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, toIndex);
-        }
-
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
-        }
-
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
-    }
-
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+            onRecoveryComplete();
 
-        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(currentRecoveryBatchCount == 0) {
-            startLogRecoveryBatch(batchSize);
-        }
-
-        appendRecoveredLogEntry(logEntry.getData());
+            initializeBehavior();
 
-        if(++currentRecoveryBatchCount >= batchSize) {
-            endCurrentLogRecoveryBatch();
+            raftRecovery = null;
         }
     }
 
-    private void endCurrentLogRecoveryBatch() {
-        applyCurrentLogRecoveryBatch();
-        currentRecoveryBatchCount = 0;
-    }
-
-    private void onRecoveryCompletedMessage() {
-        if(currentRecoveryBatchCount > 0) {
-            endCurrentLogRecoveryBatch();
-        }
-
-        onRecoveryComplete();
-
-        String recoveryTime = "";
-        if(recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
-            recoveryTimer = null;
-        }
-
-        LOG.info(
-            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
-                "Persistence Id =  " + persistenceId() +
-                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
-                "journal-size={}",
-            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-            replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
-        initializeBehavior();
+    protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+                getRaftActorRecoveryCohort());
     }
 
     protected void initializeBehavior(){
@@ -309,7 +191,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    @Override public void handleCommand(Object message) {
+    @Override
+    public void handleCommand(Object message) {
+        if(snapshotSupport == null) {
+            snapshotSupport = newRaftActorSnapshotMessageSupport();
+        }
+
+        boolean handled = snapshotSupport.handleSnapshotMessage(message);
+        if(handled) {
+            return;
+        }
+
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -336,56 +228,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
-            }
-
-            applySnapshot(snapshot.getState());
-
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                    currentBehavior));
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
-            long sequenceNumber = success.metadata().sequenceNr();
-
-            commitSnapshot(sequenceNumber);
-
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
-
-            context.getSnapshotManager().rollback();
-
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
-            context.getSnapshotManager().create(createSnapshotProcedure);
-
-        } else if (message instanceof CaptureSnapshotReply) {
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
@@ -395,6 +244,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                currentBehavior, getRaftActorSnapshotCohort());
+    }
+
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
@@ -621,7 +475,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
-                    self().tell(COMMIT_SNAPSHOT, self());
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
                 }
             });
         }
@@ -645,10 +499,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getSnapshotManager().commit(persistence(), sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -670,31 +520,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Object data);
 
     /**
-     * This method is called during recovery at the start of a batch of state entries. Derived
-     * classes should perform any initialization needed to start a batch.
+     * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
      */
-    protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
-    /**
-     * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startLogRecoveryBatch}.
-     *
-     * @param data the state data
-     */
-    protected abstract void appendRecoveredLogEntry(Payload data);
-
-    /**
-     * This method is called during recovery to reconstruct the state of the actor.
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
-    /**
-     * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
-     */
-    protected abstract void applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
 
     /**
      * This method is called when recovery is complete.
@@ -702,24 +531,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -753,16 +568,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
     protected boolean hasFollowers(){
         return getRaftActorContext().hasFollowers();
     }
 
+    /**
+     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntriesTest}
+     *             whose type for fromIndex is long instead of int. This class was kept for backwards
+     *             compatibility with Helium.
+     */
+    @Deprecated
     static class DeleteEntries implements Serializable {
         private static final long serialVersionUID = 1L;
         private final int fromIndex;
@@ -795,14 +610,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private class CreateSnapshotProcedure implements Procedure<Void> {
-
-        @Override
-        public void apply(Void aVoid) throws Exception {
-            createSnapshot();
-        }
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
index 9f4b7cb..7198876 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Map;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.slf4j.Logger;
 
 /**
@@ -170,6 +171,8 @@ public interface RaftActorContext {
 
     SnapshotManager getSnapshotManager();
 
+    DataPersistenceProvider getPersistenceProvider();
+
     boolean hasFollowers();
 
     long getTotalMemory();
index 684845c..049b91c 100644 (file)
@@ -17,6 +17,7 @@ import akka.actor.UntypedActorContext;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Map;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
@@ -48,9 +49,11 @@ public class RaftActorContextImpl implements RaftActorContext {
     // be passed to it in the constructor
     private SnapshotManager snapshotManager;
 
+    private final DataPersistenceProvider persistenceProvider;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
-            ConfigParams configParams, Logger logger) {
+            ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
@@ -59,6 +62,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.lastApplied = lastApplied;
         this.peerAddresses = peerAddresses;
         this.configParams = configParams;
+        this.persistenceProvider = persistenceProvider;
         this.LOG = logger;
     }
 
@@ -182,4 +186,9 @@ public class RaftActorContextImpl implements RaftActorContext {
     public boolean hasFollowers() {
         return getPeerAddresses().keySet().size() > 0;
     }
+
+    @Override
+    public DataPersistenceProvider getPersistenceProvider() {
+        return persistenceProvider;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
new file mode 100644 (file)
index 0000000..a9f00aa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Interface for a class that participates in raft actor persistence recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorRecoveryCohort {
+
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startLogRecoveryBatch}.
+     *
+     * @param data the state data
+     */
+    void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshotBytes A snapshot of the state of the actor
+     */
+    void applyRecoverySnapshot(byte[] snapshotBytes);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+     */
+    void applyCurrentLogRecoveryBatch();
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
new file mode 100644 (file)
index 0000000..57603a5
--- /dev/null
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Support class that handles persistence recovery for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorRecoverySupport {
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorRecoveryCohort cohort;
+
+    private int currentRecoveryBatchCount;
+
+    private Stopwatch recoveryTimer;
+    private final Logger log;
+
+    RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.log = context.getLogger();
+    }
+
+    boolean handleRecoveryMessage(Object message) {
+        boolean recoveryComplete = false;
+        if(persistence.isRecoveryApplicable()) {
+            if (message instanceof SnapshotOffer) {
+                onRecoveredSnapshot((SnapshotOffer) message);
+            } else if (message instanceof ReplicatedLogEntry) {
+                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+            } else if (message instanceof ApplyLogEntries) {
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+            } else if (message instanceof DeleteEntries) {
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
+            } else if (message instanceof UpdateElectionTerm) {
+                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                        ((UpdateElectionTerm) message).getVotedFor());
+            } else if (message instanceof RecoveryCompleted) {
+                onRecoveryCompletedMessage();
+                recoveryComplete = true;
+            }
+        } else if (message instanceof RecoveryCompleted) {
+            recoveryComplete = true;
+        }
+
+        return recoveryComplete;
+    }
+
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
+
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = Stopwatch.createStarted();
+        }
+    }
+
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: SnapshotOffer called..", context.getId());
+        }
+
+        initRecoveryTimer();
+
+        Snapshot snapshot = (Snapshot) offer.snapshot();
+
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+        Stopwatch timer = Stopwatch.createStarted();
+
+        // Apply the snapshot to the actors state
+        cohort.applyRecoverySnapshot(snapshot.getState());
+
+        timer.stop();
+        log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
+                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+    }
+
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
+                    logEntry.getIndex(), logEntry.size());
+        }
+
+        replicatedLog().append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(long toIndex) {
+        long lastUnappliedIndex = context.getLastApplied() + 1;
+
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+                    context.getId(), lastUnappliedIndex, toIndex);
+        }
+
+        long lastApplied = lastUnappliedIndex - 1;
+        for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+            ReplicatedLogEntry logEntry = replicatedLog().get(i);
+            if(logEntry != null) {
+                lastApplied++;
+                batchRecoveredLogEntry(logEntry);
+            } else {
+                // Shouldn't happen but cover it anyway.
+                log.error("Log entry not found for index {}", i);
+                break;
+            }
+        }
+
+        context.setLastApplied(lastApplied);
+        context.setCommitIndex(lastApplied);
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            cohort.startLogRecoveryBatch(batchSize);
+        }
+
+        cohort.appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
+        }
+    }
+
+    private void endCurrentLogRecoveryBatch() {
+        cohort.applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                 "Persistence Id =  " + context.getId() +
+                 " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
+                 "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..ad68726
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+    /**
+     * This method is called by the RaftActor when a snapshot needs to be
+     * created. The implementation should send a CaptureSnapshotReply to the given actor.
+     *
+     * @param actorRef the actor to which to respond
+     */
+    void createSnapshot(ActorRef actorRef);
+
+    /**
+     * This method is called to apply a snapshot installed by the leader.
+     *
+     * @param snapshotBytes a snapshot of the state of the actor
+     */
+    void applySnapshot(byte[] snapshotBytes);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
new file mode 100644 (file)
index 0000000..790ff89
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+    static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorSnapshotCohort cohort;
+    private final Logger log;
+
+    private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+        @Override
+        public void apply(Void notUsed) throws Exception {
+            cohort.createSnapshot(context.getActor());
+        }
+    };
+
+    RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.log = context.getLogger();
+    }
+
+    boolean handleSnapshotMessage(Object message) {
+        if(message instanceof ApplySnapshot ) {
+            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            return true;
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+            return true;
+        } else if (message instanceof SaveSnapshotFailure) {
+            onSaveSnapshotFailure((SaveSnapshotFailure) message);
+            return true;
+        } else if (message instanceof CaptureSnapshot) {
+            onCaptureSnapshot(message);
+            return true;
+        } else if (message instanceof CaptureSnapshotReply) {
+            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            return true;
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            context.getSnapshotManager().commit(persistence, -1);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+        context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+    }
+
+    private void onCaptureSnapshot(Object message) {
+        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+        context.getSnapshotManager().create(createSnapshotProcedure);
+    }
+
+    private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+        log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                context.getId(), saveSnapshotFailure.cause());
+
+        context.getSnapshotManager().rollback();
+    }
+
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+        log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+        long sequenceNumber = success.metadata().sequenceNr();
+
+        context.getSnapshotManager().commit(persistence, sequenceNumber);
+    }
+
+    private void onApplySnapshot(Snapshot snapshot) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: ApplySnapshot called on Follower Actor " +
+                    "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+        }
+
+        cohort.applySnapshot(snapshot.getState());
+
+        //clears the followers log, sets the snapshot index to ensure adjusted-index works
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+                currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+    }
+}
index 3e4d727..8388eaf 100644 (file)
@@ -51,8 +51,9 @@ public interface ReplicatedLog {
      * information
      *
      * @param index the index of the log entry
+     * @return the adjusted index of the first log entry removed or -1 if log entry not found.
      */
-    void removeFrom(long index);
+    long removeFrom(long index);
 
 
     /**
index fdb6305..1cfe153 100644 (file)
@@ -11,7 +11,7 @@ import akka.japi.Procedure;
 import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 /**
@@ -27,11 +27,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
-        public void apply(DeleteEntries param) {
-            dataSize = 0;
-            for (ReplicatedLogEntry entry : journal) {
-                dataSize += entry.size();
-            }
+        public void apply(DeleteEntries notUsed) {
         }
     };
 
@@ -57,16 +53,11 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     @Override
     public void removeFromAndPersist(long logEntryIndex) {
-        int adjustedIndex = adjustedIndex(logEntryIndex);
-
-        if (adjustedIndex < 0) {
-            return;
-        }
-
         // FIXME: Maybe this should be done after the command is saved
-        journal.subList(adjustedIndex , journal.size()).clear();
-
-        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        long adjustedIndex = removeFrom(logEntryIndex);
+        if(adjustedIndex >= 0) {
+            persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        }
     }
 
     @Override
@@ -83,7 +74,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         }
 
         // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        journal.add(replicatedLogEntry);
+        append(replicatedLogEntry);
 
         // When persisting events with persist it is guaranteed that the
         // persistent actor will not receive further commands between the
@@ -96,8 +87,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
                 public void apply(ReplicatedLogEntry evt) throws Exception {
                     int logEntrySize = replicatedLogEntry.size();
 
-                    dataSize += logEntrySize;
-                    long dataSizeForCheck = dataSize;
+                    long dataSizeForCheck = dataSize();
 
                     dataSizeSinceLastSnapshot += logEntrySize;
 
index 8121f75..f4f936b 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -19,7 +20,6 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
@@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            if(targetFollower != null){
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
-            } else {
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
             context.getActor().tell(captureSnapshot, context.getActor());
 
             return true;
@@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState {
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
@@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState {
             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            persistenceProvider.deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }
 
index daa8f77..7c182f0 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
 public class CaptureSnapshot {
     private final long lastAppliedIndex;
     private final long lastAppliedTerm;
@@ -16,14 +20,17 @@ public class CaptureSnapshot {
     private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+                unAppliedEntries, false);
     }
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+            long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
@@ -31,6 +38,7 @@ public class CaptureSnapshot {
         this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
+        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
     }
 
     public long getLastAppliedIndex() {
@@ -61,6 +69,10 @@ public class CaptureSnapshot {
         return replicatedToAllTerm;
     }
 
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -68,7 +80,9 @@ public class CaptureSnapshot {
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
                 .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
-                .append(replicatedToAllTerm).append("]");
+                .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
         return builder.toString();
     }
+
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java
new file mode 100644 (file)
index 0000000..97742c0
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * Internal message that is stored in the akka's persistent journal to delete journal entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntries implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long fromIndex;
+
+    public DeleteEntries(long fromIndex) {
+        this.fromIndex = fromIndex;
+    }
+
+    public long getFromIndex() {
+        return fromIndex;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("DeleteEntries [fromIndex=").append(fromIndex).append("]");
+        return builder.toString();
+    }
+}
index 2c433f9..bdfdd9b 100644 (file)
@@ -460,7 +460,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             long followerNextIndex = followerLogInformation.getNextIndex();
             boolean isFollowerActive = followerLogInformation.isFollowerActive();
             boolean sendAppendEntries = false;
-            List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
+            List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if (mapFollowerToSnapshot.get(followerId) != null) {
                 // if install snapshot is in process , then sent next chunk if possible
index b910313..977cf0e 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
@@ -20,6 +19,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -52,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
-        private volatile byte[] snapshot;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -112,20 +110,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         @Override
-        protected void createSnapshot() {
-            if(snapshot != null) {
-                getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+        public void createSnapshot(ActorRef actorRef) {
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
 
-        @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
-        }
-
-        void setSnapshot(byte[] snapshot) {
-            this.snapshot = snapshot;
-        }
-
         public ActorRef collectorActor() {
             return collectorActor;
         }
@@ -159,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long initialTerm = 5;
     protected long currentTerm;
 
+    protected List<Object> expSnapshotState = new ArrayList<>();
+
     @After
     public void tearDown() {
         InMemoryJournal.clear();
@@ -184,7 +178,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void waitUntilLeader(ActorRef actorRef) {
-        RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+        RaftActorTestKit.waitUntilLeader(actorRef);
     }
 
     protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
@@ -216,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
-            int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+            int lastAppliedIndex, long lastTerm, long lastIndex)
+                    throws Exception {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
         assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
-        assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+        for(int i = 0; i < expSnapshotState.size(); i++) {
+            assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+        }
     }
 
     protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
index 8fdb7ea..d175289 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -15,7 +16,6 @@ import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,12 +40,33 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
-    @After
-    public void tearDown() {
-        replicatedLogImpl.journal.clear();
-        replicatedLogImpl.setSnapshotIndex(-1);
-        replicatedLogImpl.setSnapshotTerm(-1);
-        replicatedLogImpl = null;
+    @Test
+    public void testEmptyLog() {
+        replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+
+        assertEquals("size", 0, replicatedLogImpl.size());
+        assertEquals("dataSize", 0, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", -1, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", -1, replicatedLogImpl.lastTerm());
+        assertEquals("isPresent", false, replicatedLogImpl.isPresent(0));
+        assertEquals("isInSnapshot", false, replicatedLogImpl.isInSnapshot(0));
+        Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNull("last", replicatedLogImpl.last());
+
+        List<ReplicatedLogEntry> list = replicatedLogImpl.getFrom(0, 1);
+        assertEquals("getFrom size", 0, list.size());
+
+        assertEquals("removeFrom", -1, replicatedLogImpl.removeFrom(1));
+
+        replicatedLogImpl.setSnapshotIndex(2);
+        replicatedLogImpl.setSnapshotTerm(1);
+
+        assertEquals("getSnapshotIndex", 2, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", 2, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", 1, replicatedLogImpl.lastTerm());
     }
 
     @Test
@@ -65,7 +86,7 @@ public class AbstractReplicatedLogImplTest {
         // now create a snapshot of 3 entries, with 1 unapplied entry left in the log
         // It removes the entries which have made it to snapshot
         // and updates the snapshot index and term
-        Map<Long, String> state = takeSnapshot(3);
+        takeSnapshot(3);
 
         // check the values after the snapshot.
         // each index value passed in the test is the logical index (log entry index)
@@ -101,7 +122,7 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(2, replicatedLogImpl.getFrom(6).size());
 
         // take a second snapshot with 5 entries with 0 unapplied entries left in the log
-        state = takeSnapshot(5);
+        takeSnapshot(5);
 
         assertEquals(0, replicatedLogImpl.size());
         assertNull(replicatedLogImpl.last());
@@ -142,24 +163,67 @@ public class AbstractReplicatedLogImplTest {
         replicatedLogImpl.snapshotPreCommit(-1, -1);
         assertEquals(8, replicatedLogImpl.size());
         assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(-1, replicatedLogImpl.getSnapshotTerm());
 
-        replicatedLogImpl.snapshotPreCommit(4, 3);
+        replicatedLogImpl.snapshotPreCommit(4, 2);
         assertEquals(3, replicatedLogImpl.size());
         assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(2, replicatedLogImpl.getSnapshotTerm());
 
         replicatedLogImpl.snapshotPreCommit(6, 3);
         assertEquals(1, replicatedLogImpl.size());
         assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
 
         replicatedLogImpl.snapshotPreCommit(7, 3);
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
 
         //running it again on an empty list should not throw exception
         replicatedLogImpl.snapshotPreCommit(7, 3);
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
+    }
+
+    @Test
+    public void testSnapshotCommit() {
+
+        replicatedLogImpl.snapshotPreCommit(1, 1);
+
+        replicatedLogImpl.snapshotCommit();
+
+        assertEquals("size", 2, replicatedLogImpl.size());
+        assertEquals("dataSize", 2, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", 3, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", 2, replicatedLogImpl.lastTerm());
+
+        Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNull("get(1)", replicatedLogImpl.get(1));
+        Assert.assertNotNull("get(2)", replicatedLogImpl.get(2));
+        Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
+    }
+
+    @Test
+    public void testSnapshotRollback() {
+
+        replicatedLogImpl.snapshotPreCommit(1, 1);
+
+        assertEquals("size", 2, replicatedLogImpl.size());
+        assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+
+        replicatedLogImpl.snapshotRollback();
 
+        assertEquals("size", 4, replicatedLogImpl.size());
+        assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+        Assert.assertNotNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
     }
 
     @Test
@@ -187,19 +251,45 @@ public class AbstractReplicatedLogImplTest {
         assertTrue(replicatedLogImpl.isPresent(5));
     }
 
+    @Test
+    public void testRemoveFrom() {
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2)));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3)));
+
+        assertEquals("dataSize", 9, replicatedLogImpl.dataSize());
+
+        long adjusted = replicatedLogImpl.removeFrom(4);
+        assertEquals("removeFrom - adjusted", 4, adjusted);
+        assertEquals("size", 4, replicatedLogImpl.size());
+        assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+
+        takeSnapshot(1);
+
+        adjusted = replicatedLogImpl.removeFrom(2);
+        assertEquals("removeFrom - adjusted", 1, adjusted);
+        assertEquals("size", 1, replicatedLogImpl.size());
+        assertEquals("dataSize", 1, replicatedLogImpl.dataSize());
+
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0));
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100));
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
-        List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
-        for (ReplicatedLogEntry entry : entries) {
+
+        long lastIndex = 0;
+        long lastTerm = 0;
+        for(int i = 0; i < numEntries; i++) {
+            ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
             map.put(entry.getIndex(), entry.getData().toString());
+            lastIndex = entry.getIndex();
+            lastTerm = entry.getTerm();
         }
 
-        int term = (int) replicatedLogImpl.lastTerm();
-        int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
-        entries.clear();
-        replicatedLogImpl.setSnapshotTerm(term);
-        replicatedLogImpl.setSnapshotIndex(lastIndex);
+        replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm);
+        replicatedLogImpl.snapshotCommit();
 
         return map;
 
@@ -213,15 +303,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public int dataSize() {
-            return -1;
-        }
-
-        public List<ReplicatedLogEntry> getEntriesTill(final int index) {
-            return journal.subList(0, index);
-        }
-
         @Override
         public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
         }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java
new file mode 100644 (file)
index 0000000..da49718
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import akka.japi.Procedure;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ElectionTermImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ElectionTermImplTest {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testUpdateAndPersist() throws Exception {
+        ElectionTermImpl impl = new ElectionTermImpl(mockPersistence, "test", LOG);
+
+        impl.updateAndPersist(10, "member-1");
+
+        assertEquals("getCurrentTerm", 10, impl.getCurrentTerm());
+        assertEquals("getVotedFor", "member-1", impl.getVotedFor());
+
+        ArgumentCaptor<Object> message = ArgumentCaptor.forClass(Object.class);
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistence).persist(message.capture(), procedure.capture());
+
+        assertEquals("Message type", UpdateElectionTerm.class, message.getValue().getClass());
+        UpdateElectionTerm update = (UpdateElectionTerm)message.getValue();
+        assertEquals("getCurrentTerm", 10, update.getCurrentTerm());
+        assertEquals("getVotedFor", "member-1", update.getVotedFor());
+
+        procedure.getValue().apply(null);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
new file mode 100644 (file)
index 0000000..586ca8c
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+
+    final RaftActor actorDelegate;
+    final RaftActorRecoveryCohort recoveryCohortDelegate;
+    final RaftActorSnapshotCohort snapshotCohortDelegate;
+    private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+    private final List<Object> state;
+    private ActorRef roleChangeNotifier;
+    private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+    private RaftActorRecoverySupport raftActorRecoverySupport;
+    private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+
+    public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+        private static final long serialVersionUID = 1L;
+        private final Map<String, String> peerAddresses;
+        private final String id;
+        private final Optional<ConfigParams> config;
+        private final DataPersistenceProvider dataPersistenceProvider;
+        private final ActorRef roleChangeNotifier;
+        private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+
+        private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+            Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+            ActorRef roleChangeNotifier) {
+            this.peerAddresses = peerAddresses;
+            this.id = id;
+            this.config = config;
+            this.dataPersistenceProvider = dataPersistenceProvider;
+            this.roleChangeNotifier = roleChangeNotifier;
+        }
+
+        @Override
+        public MockRaftActor create() throws Exception {
+            MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                dataPersistenceProvider);
+            mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+            mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
+            return mockRaftActor;
+        }
+    }
+
+    public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                         DataPersistenceProvider dataPersistenceProvider) {
+        super(id, peerAddresses, config);
+        state = new ArrayList<>();
+        this.actorDelegate = mock(RaftActor.class);
+        this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+        this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+        if(dataPersistenceProvider == null){
+            setPersistence(true);
+        } else {
+            setPersistence(dataPersistenceProvider);
+        }
+    }
+
+    public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+        raftActorRecoverySupport = support;
+    }
+
+    @Override
+    public RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
+    }
+
+    @Override
+    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+    }
+
+    public void waitForRecoveryComplete() {
+        try {
+            assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void waitForInitializeBehaviorComplete() {
+        try {
+            assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    public void waitUntilLeader(){
+        for(int i = 0;i < 10; i++){
+            if(isLeader()){
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public List<Object> getState() {
+        return state;
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
+        MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
+        creator.snapshotMessageSupport = snapshotMessageSupport;
+        return Props.create(creator);
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+        Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+                              DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+    }
+
+    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        actorDelegate.applyState(clientActor, identifier, data);
+        LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+        state.add(data);
+    }
+
+    @Override
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
+    }
+
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
+    }
+
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    public void appendRecoveredLogEntry(Payload data) {
+        state.add(data);
+    }
+
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        actorDelegate.onRecoveryComplete();
+        recoveryComplete.countDown();
+    }
+
+    @Override
+    protected void initializeBehavior() {
+        super.initializeBehavior();
+        initializeBehaviorComplete.countDown();
+    }
+
+    @Override
+    public void applyRecoverySnapshot(byte[] bytes) {
+        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+        try {
+            Object data = toObject(bytes);
+            if (data instanceof List) {
+                state.addAll((List<?>) data);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        LOG.info("{}: createSnapshot called", persistenceId());
+        snapshotCohortDelegate.createSnapshot(actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte [] snapshot) {
+        LOG.info("{}: applySnapshot called", persistenceId());
+        snapshotCohortDelegate.applySnapshot(snapshot);
+    }
+
+    @Override
+    protected void onStateChanged() {
+        actorDelegate.onStateChanged();
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return Optional.fromNullable(roleChangeNotifier);
+    }
+
+    @Override public String persistenceId() {
+        return this.getId();
+    }
+
+    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs);
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
+    }
+
+    public ReplicatedLog getReplicatedLog(){
+        return this.getRaftActorContext().getReplicatedLog();
+    }
+}
\ No newline at end of file
index 63f0df2..adf7778 100644 (file)
@@ -19,6 +19,8 @@ import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
@@ -38,6 +40,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private ConfigParams configParams;
     private boolean snapshotCaptureInitiated;
     private SnapshotManager snapshotManager;
+    private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -219,6 +222,15 @@ public class MockRaftActorContext implements RaftActorContext {
         return getPeerAddresses().keySet().size() > 0;
     }
 
+    @Override
+    public DataPersistenceProvider getPersistenceProvider() {
+        return persistenceProvider;
+    }
+
+    public void setPersistenceProvider(DataPersistenceProvider persistenceProvider) {
+        this.persistenceProvider = persistenceProvider;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
@@ -266,8 +278,8 @@ public class MockRaftActorContext implements RaftActorContext {
             this.size = size;
         }
 
-        @Override public  Map<GeneratedMessage.GeneratedExtension, String> encode() {
-            Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
+        @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+            Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
             map.put(MockPayloadMessages.value, value);
             return map;
         }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
new file mode 100644 (file)
index 0000000..c4e0ef8
--- /dev/null
@@ -0,0 +1,311 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for RaftActorRecoverySupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorRecoverySupportTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private RaftActorRecoveryCohort mockCohort;
+
+    private RaftActorRecoverySupport support;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+
+        support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+
+        doReturn(true).when(mockPersistence).isRecoveryApplicable();
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+    }
+
+    private void sendMessageToSupport(Object message) {
+        sendMessageToSupport(message, false);
+    }
+
+    private void sendMessageToSupport(Object message, boolean expComplete) {
+        boolean complete = support.handleRecoveryMessage(message);
+        assertEquals("complete", expComplete, complete);
+    }
+
+    @Test
+    public void testOnReplicatedLogEntry() {
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+
+        sendMessageToSupport(logEntry);
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+    }
+
+    @Test
+    public void testOnApplyJournalEntries() {
+        configParams.setJournalRecoveryLogBatchSize(5);
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(2));
+
+        assertEquals("Last applied", 2, context.getLastApplied());
+        assertEquals("Commit index", 2, context.getCommitIndex());
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        assertEquals("Last applied", 4, context.getLastApplied());
+        assertEquals("Last applied", 4, context.getLastApplied());
+
+        sendMessageToSupport(new ApplyJournalEntries(5));
+
+        assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", 5, context.getLastApplied());
+        assertEquals("Commit index", 5, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+
+        for(int i = 0; i < replicatedLog.size() - 1; i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+        inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnApplyLogEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+
+        sendMessageToSupport(new ApplyLogEntries(0));
+
+        assertEquals("Last applied", 0, context.getLastApplied());
+        assertEquals("Commit index", 0, context.getCommitIndex());
+    }
+
+    @Test
+    public void testOnSnapshotOffer() {
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+
+        byte[] snapshotBytes = {1,2,3,4,5};
+
+        ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4", 4));
+
+        ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5", 5));
+
+        int lastAppliedDuringSnapshotCapture = 3;
+        int lastIndexDuringSnapshotCapture = 5;
+
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+
+        verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithRemainingBatch() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+
+        sendMessageToSupport(new ApplyJournalEntries(1));
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        assertEquals("Last applied", 1, context.getLastApplied());
+        assertEquals("Commit index", 1, context.getCommitIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
+
+        for(int i = 0; i < replicatedLog.size(); i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithNoRemainingBatch() {
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+
+    @Test
+    public void testOnDeprecatedDeleteEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+
+        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testOnDeleteEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+
+        sendMessageToSupport(new DeleteEntries(1));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testUpdateElectionTerm() {
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+    }
+
+    @Test
+    public void testRecoveryWithPersistenceDisabled() {
+        doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        sendMessageToSupport(new DeleteEntries(5));
+
+        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
+
+        assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+        assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", null, context.getTermInformation().getVotedFor());
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
new file mode 100644 (file)
index 0000000..ae9c784
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotMetadata;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for RaftActorSnapshotMessageSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorSnapshotMessageSupportTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private RaftActorSnapshotCohort mockCohort;
+
+    @Mock
+    private SnapshotManager mockSnapshotManager;
+
+    @Mock
+    ActorRef mockRaftActorRef;
+
+    private RaftActorSnapshotMessageSupport support;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
+                new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG) {
+            @Override
+            public SnapshotManager getSnapshotManager() {
+                return mockSnapshotManager;
+            }
+        };
+
+        support = new RaftActorSnapshotMessageSupport(mockPersistence, context, mockBehavior, mockCohort);
+
+        doReturn(true).when(mockPersistence).isRecoveryApplicable();
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+    }
+
+    private void sendMessageToSupport(Object message) {
+        sendMessageToSupport(message, true);
+    }
+
+    private void sendMessageToSupport(Object message, boolean expHandled) {
+        boolean handled = support.handleSnapshotMessage(message);
+        assertEquals("complete", expHandled, handled);
+    }
+
+    @Test
+    public void testOnApplySnapshot() {
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
+
+        byte[] snapshotBytes = {1,2,3,4,5};
+
+        ReplicatedLogEntry unAppliedEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+
+        long lastAppliedDuringSnapshotCapture = 1;
+        long lastIndexDuringSnapshotCapture = 2;
+
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+
+        sendMessageToSupport(new ApplySnapshot(snapshot));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+
+        verify(mockCohort).applySnapshot(snapshotBytes);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testOnCaptureSnapshot() throws Exception {
+
+        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
+
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockSnapshotManager).create(procedure.capture());
+
+        procedure.getValue().apply(null);
+
+        verify(mockCohort).createSnapshot(same(mockRaftActorRef));
+    }
+
+    @Test
+    public void testOnCaptureSnapshotReply() {
+
+        byte[] snapshot = {1,2,3,4,5};
+        sendMessageToSupport(new CaptureSnapshotReply(snapshot));
+
+        verify(mockSnapshotManager).persist(same(mockPersistence), same(snapshot), same(mockBehavior), anyLong());
+    }
+
+    @Test
+    public void testOnSaveSnapshotSuccess() {
+
+        long sequenceNumber = 100;
+        sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
+
+        verify(mockSnapshotManager).commit(mockPersistence, sequenceNumber);
+    }
+
+    @Test
+    public void testOnSaveSnapshotFailure() {
+
+        sendMessageToSupport(new SaveSnapshotFailure(new SnapshotMetadata("foo", 100, 1234L),
+                new Throwable("mock")));
+
+        verify(mockSnapshotManager).rollback();
+    }
+
+    @Test
+    public void testOnCommitSnapshot() {
+
+        sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
+
+        verify(mockSnapshotManager).commit(mockPersistence, -1);
+    }
+
+    @Test
+    public void testUnhandledMessage() {
+
+        sendMessageToSupport("unhandled", false);
+    }
+}
index 17a81ac..82ebcd1 100644 (file)
@@ -2,43 +2,32 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.japi.Creator;
 import akka.japi.Procedure;
-import akka.pattern.Patterns;
-import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,16 +35,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
@@ -63,26 +48,26 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
 
+    static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
     private TestActorFactory factory;
 
     @Before
@@ -97,258 +82,6 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor {
-
-        private final RaftActor delegate;
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-        private final List<Object> state;
-        private ActorRef roleChangeNotifier;
-        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
-
-        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
-            private static final long serialVersionUID = 1L;
-            private final Map<String, String> peerAddresses;
-            private final String id;
-            private final Optional<ConfigParams> config;
-            private final DataPersistenceProvider dataPersistenceProvider;
-            private final ActorRef roleChangeNotifier;
-
-            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
-                ActorRef roleChangeNotifier) {
-                this.peerAddresses = peerAddresses;
-                this.id = id;
-                this.config = config;
-                this.dataPersistenceProvider = dataPersistenceProvider;
-                this.roleChangeNotifier = roleChangeNotifier;
-            }
-
-            @Override
-            public MockRaftActor create() throws Exception {
-                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
-                    dataPersistenceProvider);
-                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
-                return mockRaftActor;
-            }
-        }
-
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
-                             DataPersistenceProvider dataPersistenceProvider) {
-            super(id, peerAddresses, config);
-            state = new ArrayList<>();
-            this.delegate = mock(RaftActor.class);
-            if(dataPersistenceProvider == null){
-                setPersistence(true);
-            } else {
-                setPersistence(dataPersistenceProvider);
-            }
-        }
-
-        public void waitForRecoveryComplete() {
-            try {
-                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-        public void waitForInitializeBehaviorComplete() {
-            try {
-                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-
-        public void waitUntilLeader(){
-            for(int i = 0;i < 10; i++){
-                if(isLeader()){
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        public List<Object> getState() {
-            return state;
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, ActorRef roleChangeNotifier,
-                                  DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
-        }
-
-
-        @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            delegate.applyState(clientActor, identifier, data);
-            LOG.info("{}: applyState called", persistenceId());
-        }
-
-        @Override
-        protected void startLogRecoveryBatch(int maxBatchSize) {
-        }
-
-        @Override
-        protected void appendRecoveredLogEntry(Payload data) {
-            state.add(data);
-        }
-
-        @Override
-        protected void applyCurrentLogRecoveryBatch() {
-        }
-
-        @Override
-        protected void onRecoveryComplete() {
-            delegate.onRecoveryComplete();
-            recoveryComplete.countDown();
-        }
-
-        @Override
-        protected void initializeBehavior() {
-            super.initializeBehavior();
-            initializeBehaviorComplete.countDown();
-        }
-
-        @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
-            delegate.applyRecoverySnapshot(bytes);
-            try {
-                Object data = toObject(bytes);
-                if (data instanceof List) {
-                    state.addAll((List<?>) data);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override protected void createSnapshot() {
-            LOG.info("{}: createSnapshot called", persistenceId());
-            delegate.createSnapshot();
-        }
-
-        @Override protected void applySnapshot(byte [] snapshot) {
-            LOG.info("{}: applySnapshot called", persistenceId());
-            delegate.applySnapshot(snapshot);
-        }
-
-        @Override protected void onStateChanged() {
-            delegate.onStateChanged();
-        }
-
-        @Override
-        protected Optional<ActorRef> getRoleChangeNotifier() {
-            return Optional.fromNullable(roleChangeNotifier);
-        }
-
-        @Override public String persistenceId() {
-            return this.getId();
-        }
-
-        private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-            Object obj = null;
-            ByteArrayInputStream bis = null;
-            ObjectInputStream ois = null;
-            try {
-                bis = new ByteArrayInputStream(bs);
-                ois = new ObjectInputStream(bis);
-                obj = ois.readObject();
-            } finally {
-                if (bis != null) {
-                    bis.close();
-                }
-                if (ois != null) {
-                    ois.close();
-                }
-            }
-            return obj;
-        }
-
-        public ReplicatedLog getReplicatedLog(){
-            return this.getRaftActorContext().getReplicatedLog();
-        }
-
-    }
-
-
-    public static class RaftActorTestKit extends JavaTestKit {
-        private final ActorRef raftActor;
-
-        public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
-            super(actorSystem);
-
-            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
-                    Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
-
-        }
-
-
-        public ActorRef getRaftActor() {
-            return raftActor;
-        }
-
-        public boolean waitForLogMessage(final Class<?> logEventClass, String message){
-            // Wait for a specific log message to show up
-            return
-                new JavaTestKit.EventFilter<Boolean>(logEventClass
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(raftActor.path().toString())
-                    .message(message)
-                    .occurrences(1).exec();
-
-
-        }
-
-        protected void waitUntilLeader(){
-            waitUntilLeader(raftActor);
-        }
-
-        public static void waitUntilLeader(ActorRef actorRef) {
-            FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
-            for(int i = 0; i < 20 * 5; i++) {
-                Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
-                try {
-                    FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
-                    if(resp.getLeaderActor() != null) {
-                        return;
-                    }
-                } catch(TimeoutException e) {
-                } catch(Exception e) {
-                    System.err.println("FindLeader threw ex");
-                    e.printStackTrace();
-                }
-
-
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-            }
-
-            Assert.fail("Leader not found for actorRef " + actorRef.path());
-        }
-
-    }
-
-
     @Test
     public void testConstruction() {
         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
@@ -361,18 +94,22 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
             // Set the heartbeat interval high to essentially disable election otherwise the test
             // may fail if the actor is switched to Leader and the commitIndex is set to the last
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
+            ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -399,11 +136,11 @@ public class RaftActorTest extends AbstractActorTest {
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
-                    new MockRaftActorContext.MockPayload("F"));
+                    new MockRaftActorContext.MockPayload("F", 2));
             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
-                    new MockRaftActorContext.MockPayload("G"));
+                    new MockRaftActorContext.MockPayload("G", 3));
             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
-                    new MockRaftActorContext.MockPayload("H"));
+                    new MockRaftActorContext.MockPayload("H", 4));
             entries.add(entry2);
             entries.add(entry3);
             entries.add(entry4);
@@ -425,278 +162,151 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
+
+            MockRaftActor mockRaftActor = ref.underlyingActor();
 
-            ref.underlyingActor().waitForRecoveryComplete();
+            mockRaftActor.waitForRecoveryComplete();
 
-            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            RaftActorContext context = mockRaftActor.getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
+            assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
-            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
+            assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
+
+            mockRaftActor.waitForInitializeBehaviorComplete();
+
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
+
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
     }
 
     @Test
-    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
         new JavaTestKit(getSystem()) {{
-            String persistenceId = factory.generateActorId("leader-");
+            String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            // Setup the persisted journal with some entries
-            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
-                    new MockRaftActorContext.MockPayload("zero"));
-            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
-                    new MockRaftActorContext.MockPayload("oen"));
-            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
-                    new MockRaftActorContext.MockPayload("two"));
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            long seqNr = 1;
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
+            TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
 
-            int lastAppliedToState = 1;
-            int lastIndex = 2;
+            MockRaftActor mockRaftActor = ref.underlyingActor();
 
-            //reinstate the actor
-            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+            mockRaftActor.waitForRecoveryComplete();
 
-            leaderActor.underlyingActor().waitForRecoveryComplete();
+            mockRaftActor.waitForInitializeBehaviorComplete();
 
-            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
-            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
-            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
-            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
-            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
     }
 
-    /**
-     * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
-     * process recovery messages
-     *
-     * @throws Exception
-     */
-
     @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
+    public void testRaftActorForwardsToRaftActorRecoverySupport() {
+        String persistenceId = factory.generateActorId("leader-");
 
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
 
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+        // Wait for akka's recovery to complete so it doesn't interfere.
+        mockRaftActor.waitForRecoveryComplete();
 
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
+        RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
+        mockRaftActor.setRaftActorRecoverySupport(mockSupport );
 
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+        mockRaftActor.handleRecover(snapshotOffer);
 
-                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+        mockRaftActor.handleRecover(logEntry);
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+        ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
+        mockRaftActor.handleRecover(applyJournalEntries);
 
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+        ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
+        mockRaftActor.handleRecover(applyLogEntries);
 
-                assertEquals("add replicated log entry", 1, replicatedLog.size());
+        DeleteEntries deleteEntries = new DeleteEntries(1);
+        mockRaftActor.handleRecover(deleteEntries);
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+        org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
+                new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
+        mockRaftActor.handleRecover(deprecatedDeleteEntries);
 
-                assertEquals("add replicated log entry", 2, replicatedLog.size());
+        UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
+        mockRaftActor.handleRecover(updateElectionTerm);
 
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
-
-                assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
-
-                // The snapshot had 4 items + we added 2 more items during the test
-                // We start removing from 5 and we should get 1 item in the replicated log
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
-
-                assertEquals("remove log entries", 1, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
-
-                assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
-
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
-            }};
+        verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
+        verify(mockSupport).handleRecoveryMessage(same(logEntry));
+        verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
+        verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
+        verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
+        verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
+        verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
     }
 
-    /**
-     * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
-     * not process recovery messages
-     *
-     * @throws Exception
-     */
     @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
+    public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
+        String persistenceId = factory.generateActorId("leader-");
 
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
-
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
-
-                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
-
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
-
-                assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
-
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
 
-                assertEquals("remove log entries", 0, replicatedLog.size());
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
+        RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
 
-                assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
 
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-            }};
-    }
+        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+        // Wait for akka's recovery to complete so it doesn't interfere.
+        mockRaftActor.waitForRecoveryComplete();
 
-    @Test
-    public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
+        ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        mockRaftActor.handleCommand(applySnapshot);
 
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        mockRaftActor.handleCommand(captureSnapshot);
 
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        mockRaftActor.handleCommand(captureSnapshotReply);
 
-                CountDownLatch persistLatch = new CountDownLatch(1);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        mockRaftActor.handleCommand(saveSnapshotSuccess);
 
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        mockRaftActor.handleCommand(saveSnapshotFailure);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-                mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
-
-                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-            }
-        };
-    }
-
-    @Test
-    public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
-
-                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
-
-                verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-            }
-        };
-    }
-
-    @Test
-    public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                mockRaftActor.waitUntilLeader();
-
-                mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-
-                mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
-
-                verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
-            }
-        };
+        verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
     }
 
     @Test
@@ -729,112 +339,6 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
-    @Test
-    public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-
-                raftActorContext.getSnapshotManager().capture(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
-                                new MockRaftActorContext.MockPayload("D")), -1);
-
-                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                verify(dataPersistenceProvider).saveSnapshot(anyObject());
-
-            }
-        };
-    }
-
-    @Test
-    public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-                MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
-
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(lastEntry);
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-                mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
-
-                long replicatedToAllIndex = 1;
-
-                mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
-
-                verify(mockRaftActor.delegate).createSnapshot();
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
-
-                verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
-
-                verify(dataPersistenceProvider).deleteMessages(100);
-
-                assertEquals(3, mockRaftActor.getReplicatedLog().size());
-                assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
-
-                assertNotNull(mockRaftActor.getReplicatedLog().get(2));
-                assertNotNull(mockRaftActor.getReplicatedLog().get(3));
-                assertNotNull(mockRaftActor.getReplicatedLog().get(4));
-
-                // Index 2 will not be in the log because it was removed due to snapshotting
-                assertNull(mockRaftActor.getReplicatedLog().get(1));
-                assertNull(mockRaftActor.getReplicatedLog().get(0));
-
-            }
-        };
-    }
-
     @Test
     public void testApplyState() throws Exception {
 
@@ -860,107 +364,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
 
-                verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
-
-            }
-        };
-    }
-
-    @Test
-    public void testApplySnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
-
-                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
-                oldReplicatedLog.append(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
-                                mock(Payload.class)));
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                Snapshot snapshot = mock(Snapshot.class);
-
-                doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
-
-                doReturn(3L).when(snapshot).getLastAppliedIndex();
-
-                mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
-
-                verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
-
-                assertTrue("The replicatedLog should have changed",
-                        oldReplicatedLog != mockRaftActor.getReplicatedLog());
-
-                assertEquals("lastApplied should be same as in the snapshot",
-                        (Long) 3L, mockRaftActor.getLastApplied());
-
-                assertEquals(0, mockRaftActor.getReplicatedLog().size());
-
-            }
-        };
-    }
-
-    @Test
-    public void testSaveSnapshotFailure() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-
-                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
-
-                raftActorContext.getSnapshotManager().capture(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
-                                new MockRaftActorContext.MockPayload("D")), 1);
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
-                        new Exception()));
-
-                assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
-                        mockRaftActor.getReplicatedLog().getSnapshotIndex());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
 
             }
         };
@@ -1131,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.delegate).createSnapshot();
+                verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1230,7 +634,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.delegate).createSnapshot();
+                verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
@@ -1466,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
-    private ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java
new file mode 100644 (file)
index 0000000..3e747e3
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+public class RaftActorTestKit extends JavaTestKit {
+    private final ActorRef raftActor;
+
+    public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
+        super(actorSystem);
+
+        raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
+
+    }
+
+
+    public ActorRef getRaftActor() {
+        return raftActor;
+    }
+
+    public boolean waitForLogMessage(final Class<?> logEventClass, String message){
+        // Wait for a specific log message to show up
+        return
+            new JavaTestKit.EventFilter<Boolean>(logEventClass
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(raftActor.path().toString())
+                .message(message)
+                .occurrences(1).exec();
+
+
+    }
+
+    protected void waitUntilLeader(){
+        waitUntilLeader(raftActor);
+    }
+
+    public static void waitUntilLeader(ActorRef actorRef) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
+            try {
+                FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
+                if(resp.getLeaderActor() != null) {
+                    return;
+                }
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
+                e.printStackTrace();
+            }
+
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Leader not found for actorRef " + actorRef.path());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
new file mode 100644 (file)
index 0000000..a8f490e
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private MockPayload payload0;
+    private MockPayload payload1;
+
+    @Before
+    public void setup() {
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    @Test
+    public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshotReply to the leader.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshotReply.class);
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+        // This is a side effect of trimming the persisted log to the sequence number captured at the time
+        // the snapshot was initiated.
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+                payload3, payload4), leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    private void reinstateLeaderActor() {
+        killActor(leaderActor);
+
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        leaderActor.underlyingActor().waitForRecoveryComplete();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    private void send2InitialPayloads() {
+        waitUntilLeader(leaderActor);
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        payload0 = sendPayloadData(leaderActor, "zero");
+        payload1 = sendPayloadData(leaderActor, "one");
+
+        // Verify the leader applies the states.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+        assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
new file mode 100644 (file)
index 0000000..92e384e
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.japi.Procedure;
+import com.google.common.base.Supplier;
+import java.util.Collections;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.internal.matchers.Same;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ReplicatedLogImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicatedLogImplTest {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private SnapshotManager mockSnapshotManager;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(null, null, "test",
+                new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG)  {
+            @Override
+            public SnapshotManager getSnapshotManager() {
+                return mockSnapshotManager;
+            }
+        };
+    }
+
+    private void verifyPersist(Object message) throws Exception {
+        verifyPersist(message, new Same(message));
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void verifyPersist(Object message, Matcher<?> matcher) throws Exception {
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+
+        procedure.getValue().apply(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAppendAndPersistExpectingNoCapture() throws Exception {
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+
+        log.appendAndPersist(logEntry);
+
+        verifyPersist(logEntry);
+
+        assertEquals("size", 1, log.size());
+
+        reset(mockPersistence);
+
+        Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+        log.appendAndPersist(logEntry, mockCallback);
+
+        verifyPersist(logEntry);
+
+        verify(mockCallback).apply(same(logEntry));
+        verifyNoMoreInteractions(mockSnapshotManager);
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testAppendAndPersistExpectingCaptureDueToJournalCount() throws Exception {
+        configParams.setSnapshotBatchCount(2);
+
+        doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+        MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
+
+        log.appendAndPersist(logEntry1);
+        verifyPersist(logEntry1);
+
+        verifyNoMoreInteractions(mockSnapshotManager);
+        reset(mockPersistence);
+
+        log.appendAndPersist(logEntry2);
+        verifyPersist(logEntry2);
+
+        verify(mockSnapshotManager).capture(same(logEntry2), eq(1L));
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testAppendAndPersistExpectingCaptureDueToDataSize() throws Exception {
+        doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+        context.setTotalMemoryRetriever(new Supplier<Long>() {
+            @Override
+            public Long get() {
+                return 100L;
+            }
+        });
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        int dataSize = 600;
+        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
+
+        doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+        log.appendAndPersist(logEntry);
+        verifyPersist(logEntry);
+
+        verify(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+        reset(mockPersistence, mockSnapshotManager);
+
+        logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
+
+        log.appendAndPersist(logEntry);
+        verifyPersist(logEntry);
+
+        verifyNoMoreInteractions(mockSnapshotManager);
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testRemoveFromAndPersist() throws Exception {
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
+        log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
+        log.append(new MockReplicatedLogEntry(1, 2, new MockPayload("2")));
+
+        log.removeFromAndPersist(1);
+
+        DeleteEntries deleteEntries = new DeleteEntries(1);
+        verifyPersist(deleteEntries, match(deleteEntries));
+
+        assertEquals("size", 1, log.size());
+
+        reset(mockPersistence);
+
+        log.removeFromAndPersist(1);
+
+        verifyNoMoreInteractions(mockPersistence);
+    }
+
+    public Matcher<DeleteEntries> match(final DeleteEntries actual){
+        return new BaseMatcher<DeleteEntries>() {
+            @Override
+            public boolean matches(Object o) {
+                DeleteEntries other = (DeleteEntries) o;
+                return actual.getFromIndex() == other.getFromIndex();
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText("DeleteEntries: fromIndex: " + actual.getFromIndex());
+            }
+        };
+    }
+}
index bd670fd..c74705d 100644 (file)
@@ -14,7 +14,7 @@ import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     private MockPayload recoveredPayload0;
     private MockPayload recoveredPayload1;
     private MockPayload recoveredPayload2;
+    private MockPayload payload3;
     private MockPayload payload4;
     private MockPayload payload5;
     private MockPayload payload6;
     private MockPayload payload7;
 
     @Test
-    public void runTest() {
+    public void runTest() throws Exception {
         testLog.info("testReplicationAndSnapshots starting");
 
         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
@@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
         recoveredPayload2 = new MockPayload("two");
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
-        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
 
         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
 
@@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
      * scenario, the follower consensus and application of state is delayed until after the snapshot
      * completes.
+     * @throws Exception
      */
-    private void testFirstSnapshot() {
+    private void testFirstSnapshot() throws Exception {
         testLog.info("testFirstSnapshot starting");
 
-        byte[] snapshot = new byte[] {1,2,3,4};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(recoveredPayload0);
+        expSnapshotState.add(recoveredPayload1);
+        expSnapshotState.add(recoveredPayload2);
 
         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the payload.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        payload3 = sendPayloadData(leaderActor, "three");
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
@@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     /**
      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
      * consensus occurs and the leader applies the state.
+     * @throws Exception
      */
-    private void testSecondSnapshot() {
+    private void testSecondSnapshot() throws Exception {
         testLog.info("testSecondSnapshot starting");
 
-        byte[] snapshot = new byte[] {5,6,7,8};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
@@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
-        // log entry (6).
+        expSnapshotState.add(payload7);
+
+        // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+        // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+        // when the snapshot is created (ie when the CaptureSnapshot is processed).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
@@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         leaderActor.underlyingActor().waitForRecoveryComplete();
 
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
index d4a9f77..ff9b8ce 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload0);
+        expSnapshotState.add(payload1);
+        expSnapshotState.add(payload2);
+
         testLog.info("testInitialReplications complete");
     }
 
@@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
-        leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the first payload - this should cause the first snapshot.
@@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
 
         testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
 
@@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
@@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
+        expSnapshotState.add(payload7);
+
         testLog.info("testSubsequentReplicationsAndSnapshots complete");
     }
 
@@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
                 leader.getReplicatedToAllIndex());
 
         leaderActor.underlyingActor().setMockTotalMemory(1000);
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
 
         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
@@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
+        expSnapshotState.add(payload8);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         payload9 = sendPayloadData(leaderActor, "nine", 201);
@@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
@@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+        expSnapshotState.add(payload10);
     }
 
     /**
@@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InstallSnapshot installSnapshot;
         InstallSnapshotReply installSnapshotReply;
 
-        byte[] snapshot = new byte[] {10};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload9);
 
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
-        assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+        //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
         assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
@@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
         // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
@@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
@@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+     * @throws Exception
      */
-    private void testFinalReplicationsAndSnapshot() {
+    private void testFinalReplicationsAndSnapshot() throws Exception {
         List<ApplyState> applyStates;
         ApplyState applyState;
 
         testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
 
-        byte[] snapshot = new byte[] {14};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
-
         // Send another payload - a snapshot should occur.
         payload11 = sendPayloadData(leaderActor, "eleven");
 
@@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
index 5a0d5ae..8ab762f 100644 (file)
@@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommit(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog).snapshotCommit();
 
-        verify(mockDataPersistenceProvider).deleteMessages(100L);
+        verify(mockDataPersistenceProvider).deleteMessages(50L);
 
         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
 
@@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingCommitMultipleTimesCausesNoHarm(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
-        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
 
         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java
new file mode 100644 (file)
index 0000000..55d2bcc
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for DeleteEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntriesTest {
+
+    @Test
+    public void testSerialization() {
+
+        DeleteEntries deleteEntries = new DeleteEntries(11);
+
+        DeleteEntries clone = (DeleteEntries) SerializationUtils.clone(deleteEntries);
+
+        Assert.assertEquals("getFromIndex", 11, clone.getFromIndex());
+    }
+}
index 0737d75..d482e28 100644 (file)
@@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal != null) {
             synchronized (journal) {
index 678ac34..3dc6e40 100644 (file)
@@ -65,7 +65,17 @@ public interface DataObjectModification<T extends DataObject> extends org.openda
     @Nonnull ModificationType getModificationType();
 
     /**
-     * Returns after state of top level container.
+     * Returns before-state of top level container. Implementations are encouraged,
+     * but not required to provide this state.
+     *
+     * @param root Class representing data container
+     * @return State of object before modification. Null if subtree was not present,
+     *         or the implementation cannot provide the state.
+     */
+    @Nullable T getDataBefore();
+
+    /**
+     * Returns after-state of top level container.
      *
      * @param root Class representing data container
      * @return State of object after modification. Null if subtree is not present.
index c1c23d5..b86d31b 100644 (file)
@@ -44,7 +44,7 @@ public final class DataTreeIdentifier<T extends DataObject> implements Immutable
      *
      * @return Instance identifier corresponding to the root node.
      */
-    public @Nonnull InstanceIdentifier<?> getRootIdentifier() {
+    public @Nonnull InstanceIdentifier<T> getRootIdentifier() {
         return rootIdentifier;
     }
 
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationListenerAdapter.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationListenerAdapter.java
new file mode 100644 (file)
index 0000000..03da296
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+class BindingDOMNotificationListenerAdapter implements DOMNotificationListener {
+
+    private final NotificationInvokerFactory.NotificationInvoker invoker;
+    private final BindingNormalizedNodeSerializer codec;
+
+    publi