Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
authorEd Warnicke <eaw@cisco.com>
Mon, 22 Sep 2014 18:31:51 +0000 (18:31 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 22 Sep 2014 18:31:51 +0000 (18:31 +0000)
109 files changed:
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
opendaylight/archetypes/opendaylight-karaf-distro-archetype/pom.xml
opendaylight/archetypes/opendaylight-karaf-features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/config/config-persister-feature-adapter/src/main/java/org/opendaylight/controller/configpusherfeature/internal/FeatureConfigPusher.java
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat [new file with mode: 0644]
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/karaf-branding/pom.xml
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/ComponentActivator.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyMapping.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/NodeMappingTest.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/topology/test/TopologyMappingTest.java
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModule.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModuleFactory.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStore.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStoreTransaction.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/xsql/XSQLProvider.java
opendaylight/md-sal/sal-karaf-xsql/pom.xml
opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml [new file with mode: 0644]
opendaylight/md-sal/samples/l2switch/pom.xml
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/attributes/fromxml/ObjectNameAttributeReadingStrategy.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/attributes/fromxml/SimpleIdentityRefAttributeReadingStrategy.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/config/Config.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/config/ServiceRegistryWrapper.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/config/Services.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/get/Get.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpc.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/transactions/TransactionProvider.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfMonitoringServiceImpl.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/xml/model/MonitoringSchema.java
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/xml/JaxBSerializerTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java
opendaylight/netconf/netconf-usermanager/src/main/java/org/opendaylight/controller/netconf/auth/usermanager/AuthProviderImpl.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/mapping/AbstractNetconfOperation.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlElement.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractLastNetconfOperationTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractNetconfOperationTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperationTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessageAdditionalHeaderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessageTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageHeaderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtilTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtilTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java [new file with mode: 0644]
opendaylight/northbound/networkconfiguration/neutron/pom.xml
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerPoolNorthbound.java

index f45f680..299e5b6 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-dom-xsql</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-karaf-xsql</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-dom-xsql-config</artifactId>
index 0e24176..2a988ce 100644 (file)
@@ -65,6 +65,7 @@
     <feature name ='odl-mdsal-xsql' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <bundle>mvn:org.opendaylight.controller/sal-dom-xsql/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/sal-karaf-xsql/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/${config.xsql.configfile}">mvn:org.opendaylight.controller/sal-dom-xsql-config/${project.version}/xml/config</configfile>
     </feature>
     <feature name ='odl-mdsal-apidocs' version='${project.version}'>
index 8883c64..9081ce7 100644 (file)
@@ -1,7 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-
+  <parent>
+     <groupId>org.opendaylight.controller.archetypes</groupId>
+     <artifactId>archetypes-parent</artifactId>
+     <version>0.1.1-SNAPSHOT</version>
+  </parent>
   <groupId>org.opendaylight.controller</groupId>
   <artifactId>opendaylight-karaf-distro-archetype</artifactId>
   <version>1.0.0-SNAPSHOT</version>
index 4973a69..264402a 100644 (file)
@@ -2,6 +2,11 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
+  <parent>
+     <groupId>org.opendaylight.controller.archetypes</groupId>
+     <artifactId>archetypes-parent</artifactId>
+     <version>0.1.1-SNAPSHOT</version>
+  </parent>
   <groupId>org.opendaylight.controller</groupId>
   <artifactId>opendaylight-karaf-features-archetype</artifactId>
   <version>1.0.0-SNAPSHOT</version>
index e3ffbf3..a98afc8 100644 (file)
         <artifactId>sal-dom-xsql</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-karaf-xsql</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-dom-xsql-config</artifactId>
index 1c094ad..57052f9 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.collect.LinkedHashMultimap;
  */
 public class FeatureConfigPusher {
     private static final Logger logger = LoggerFactory.getLogger(FeatureConfigPusher.class);
+    private static final int MAX_RETRIES=100;
     private FeaturesService featuresService = null;
     private ConfigPusher pusher = null;
     /*
@@ -82,7 +83,29 @@ public class FeatureConfigPusher {
     }
 
     private boolean isInstalled(Feature feature) {
-        List<Feature> installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+        List<Feature> installedFeatures= null;
+        boolean cont = true;
+        int retries = 0;
+        while(cont) {
+            try {
+                installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+                break;
+            } catch (Exception e) {
+                if(retries < MAX_RETRIES) {
+                    logger.warn("Karaf featuresService.listInstalledFeatures() has thrown an exception, retry {}, Exception {}", retries,e);
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e1) {
+                        throw new IllegalStateException(e1);
+                    }
+                    retries++;
+                    continue;
+                } else {
+                    logger.error("Giving up on Karaf featuresService.listInstalledFeatures() which has thrown an exception, retry {}, Exception {}", retries,e);
+                    throw e;
+                }
+            }
+        }
         return installedFeatures.contains(feature);
     }
 
diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat
new file mode 100644 (file)
index 0000000..7c61920
--- /dev/null
@@ -0,0 +1,64 @@
+@echo off
+rem
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem
+rem handle specific scripts; the SCRIPT_NAME is exactly the name of the Karaf
+rem script; for example karaf.bat, start.bat, stop.bat, admin.bat, client.bat, ...
+rem
+rem if "%KARAF_SCRIPT%" == "SCRIPT_NAME" (
+rem   Actions go here...
+rem )
+
+rem
+rem general settings which should be applied for all scripts go here; please keep
+rem in mind that it is possible that scripts might be executed more than once, e.g.
+rem in example of the start script where the start script is executed first and the
+rem karaf script afterwards.
+rem
+
+rem
+rem The following section shows the possible configuration options for the default
+rem karaf scripts
+rem
+rem Window name of the windows console
+rem SET KARAF_TITLE
+rem Location of Java installation
+rem SET JAVA_HOME
+rem Minimum memory for the JVM
+rem SET JAVA_MIN_MEM
+rem Maximum memory for the JVM
+rem SET JAVA_MAX_MEM
+rem Minimum perm memory for the JVM
+rem SET JAVA_PERM_MEM
+rem Maximum perm memory for the JVM
+rem SET JAVA_MAX_PERM_MEM
+rem Karaf home folder
+rem SET KARAF_HOME
+rem Karaf data folder
+rem SET KARAF_DATA
+rem Karaf base folder
+rem SET KARAF_BASE
+rem Karaf etc folder
+rem SET KARAF_ETC
+rem Additional available Karaf options
+rem SET KARAF_OPTS
+rem Enable debug mode
+rem SET KARAF_DEBUG
+IF "%JAVA_MAX_PERM_MEM%"=="" SET JAVA_MAX_PERM_MEM=512m
+IF "%JAVA_MAX_MEM%"=="" SET JAVA_MAX_MEM=2048m
index cdb6542..4a8f5ae 100644 (file)
@@ -94,6 +94,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 530e46e..691d83d 100644 (file)
@@ -116,6 +116,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 727f224..444e770 100644 (file)
@@ -2,7 +2,12 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <modelVersion>4.0.0</modelVersion>
-
+    <parent>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>releasepom</artifactId>
+      <version>0.1.2-SNAPSHOT</version>
+      <relativePath>../..</relativePath>
+    </parent>
     <groupId>org.opendaylight.controller</groupId>
     <artifactId>karaf.branding</artifactId>
     <version>1.0.0-SNAPSHOT</version>
index d71858e..4fc0cf7 100644 (file)
@@ -139,6 +139,7 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
     protected Object[] getImplementations() {
         return new Object[] {
                 dataPacketService,
+                inventory,
         };
     }
 
@@ -148,6 +149,8 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
             _instanceConfigure((ComponentActivator)imp, c, containerName);
         } else if (imp instanceof DataPacketServiceAdapter) {
             _instanceConfigure((DataPacketServiceAdapter)imp, c, containerName);
+        } else if (imp instanceof InventoryAndReadAdapter) {
+            _instanceConfigure((InventoryAndReadAdapter)imp, c, containerName);
         } else {
             throw new IllegalArgumentException(String.format("Unhandled implementation class %s", imp.getClass()));
         }
@@ -215,6 +218,22 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
                 .setRequired(false));
     }
 
+    private void _instanceConfigure(final InventoryAndReadAdapter imp, final Component it, String containerName) {
+        it.setInterface(new String[] {
+                IPluginInInventoryService.class.getName(),
+                IPluginInReadService.class.getName(),
+        }, properties());
+
+        it.add(createServiceDependency()
+                .setService(IPluginOutReadService.class)
+                .setCallbacks("setReadPublisher", "unsetReadPublisher")
+                .setRequired(false));
+        it.add(createServiceDependency()
+                .setService(IPluginOutInventoryService.class)
+                .setCallbacks("setInventoryPublisher", "unsetInventoryPublisher")
+                .setRequired(false));
+    }
+
     private void _configure(final TopologyAdapter imp, final Component it) {
         it.setInterface(IPluginInTopologyService.class.getName(), properties());
 
index e2c1386..1530e90 100644 (file)
@@ -48,7 +48,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -241,20 +240,20 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
      * @param id Table id
      * @return Table contents, or null if not present
      */
-    private Table readConfigTable(final Node node, final short id) {
+    private Table readOperationalTable(final Node node, final short id) {
         final InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class)
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node))
                 .augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey(id))
                 .build();
 
-        return (Table) startChange().readConfigurationData(tableRef);
+        return (Table) startChange().readOperationalData(tableRef);
     }
 
     @Override
     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
         final ArrayList<FlowOnNode> output = new ArrayList<>();
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
         if (table != null) {
             final List<Flow> flows = table.getFlow();
             LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@@ -268,12 +267,6 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
             }
         }
 
-        // TODO (main): Shall we send request to the switch? It will make async request to the switch.
-        // Once the plugin receives a response, it will let the adaptor know through onFlowStatisticsUpdate()
-        // If we assume that md-sal statistics manager will always be running, then it is not required
-        // But if not, then sending request will collect the latest data for adaptor at least.
-        getFlowStatisticsService().getAllFlowsStatisticsFromAllFlowTables(
-                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder().setNode(NodeMapping.toNodeRef(node)).build());
         return output;
     }
 
@@ -334,7 +327,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) {
         FlowOnNode ret = null;
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
         if (table != null) {
             final List<Flow> flows = table.getFlow();
             InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@@ -386,7 +379,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) {
         NodeTableStatistics nodeStats = null;
-        final Table table = readConfigTable(nodeTable.getNode(), (short) nodeTable.getID());
+        final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID());
         if (table != null) {
             final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
             if (tableStats != null) {
index b873f8a..bcb2367 100644 (file)
@@ -168,7 +168,22 @@ public final class NodeMapping {
      * @return
      */
     private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
-        return new NodeId(aDNode.getType() + ":" + String.valueOf(aDNode.getID()));
+        String targetPrefix = null;
+        if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
+                targetPrefix = OPENFLOW_ID_PREFIX;
+        } else {
+            targetPrefix = aDNode.getType() + ":";
+        }
+
+        return new NodeId(targetPrefix + String.valueOf(aDNode.getID()));
+    }
+
+    /**
+     * @param aDNode
+     * @return md-sal {@link NodeKey}
+     */
+    public static NodeKey toNodeKey(org.opendaylight.controller.sal.core.Node aDNode) {
+        return new NodeKey(toNodeId(aDNode));
     }
 
     public static String toNodeConnectorType(final NodeConnectorId ncId, final NodeId nodeId) {
index 6bc669f..ae723a3 100644 (file)
@@ -7,12 +7,8 @@
  */
 package org.opendaylight.controller.sal.compatibility.topology;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
 import org.opendaylight.controller.sal.compatibility.NodeMapping;
 import org.opendaylight.controller.sal.core.ConstructionException;
@@ -33,11 +29,16 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public final class TopologyMapping {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyMapping.class);
+    private final static Pattern NUMBERS_ONLY = Pattern.compile("[0-9]+");
 
     private TopologyMapping() {
         throw new UnsupportedOperationException("Utility class. Instantiation is not allowed.");
@@ -100,7 +101,13 @@ public final class TopologyMapping {
 
     public static NodeConnector toADNodeConnector(final TpId source, final NodeId nodeId) throws ConstructionException {
         checkNotNull(source);
-        return new NodeConnector(NodeConnectorIDType.OPENFLOW, Short.valueOf(toADNodeConnectorId(source)), toADNode(nodeId));
+        String nodeConnectorIdStripped = toADNodeConnectorId(source);
+        if (NUMBERS_ONLY.matcher(nodeConnectorIdStripped).matches()) {
+            return new NodeConnector(NodeConnectorIDType.OPENFLOW, Short.valueOf(nodeConnectorIdStripped), toADNode(nodeId));
+        }
+        LOG.debug("NodeConnectorId does not match openflow id type, using " + NodeMapping.MD_SAL_TYPE +  "instead");
+        NodeConnectorIDType.registerIDType(NodeMapping.MD_SAL_TYPE, String.class, NodeMapping.MD_SAL_TYPE);
+        return new NodeConnector(NodeMapping.MD_SAL_TYPE, nodeConnectorIdStripped, toADNode(nodeId));
     }
 
     public static String toADNodeConnectorId(final TpId nodeConnectorId) {
@@ -109,6 +116,12 @@ public final class TopologyMapping {
 
     public static Node toADNode(final NodeId nodeId) throws ConstructionException {
         checkNotNull(nodeId);
-        return new Node(NodeIDType.OPENFLOW, Long.valueOf(toADNodeId(nodeId)));
+        String nodeIdStripped = toADNodeId(nodeId);
+        if (NUMBERS_ONLY.matcher(nodeIdStripped).matches()) {
+            return new Node(NodeIDType.OPENFLOW, Long.valueOf(nodeIdStripped));
+        }
+        LOG.debug("NodeId does not match openflow id type, using " + NodeMapping.MD_SAL_TYPE +  "instead");
+        NodeIDType.registerIDType(NodeMapping.MD_SAL_TYPE, String.class);
+        return new Node(NodeMapping.MD_SAL_TYPE, nodeId.getValue());
     }
 }
index a776ef2..759e69f 100644 (file)
@@ -196,6 +196,19 @@ public class NodeMappingTest {
         Assert.assertEquals(0xCC4E241C4A000000L, NodeMapping.openflowFullNodeIdToLong("14721743935839928320").longValue());
     }
 
+    /**
+     * Test method for
+     * {@link org.opendaylight.controller.sal.compatibility.NodeMapping#toNodeKey(org.opendaylight.controller.sal.core.Node)}
+     * .
+     * @throws ConstructionException
+     */
+    @Test
+    public void testToNodeKey() throws ConstructionException {
+        org.opendaylight.controller.sal.core.Node aDNode = new org.opendaylight.controller.sal.core.Node(NodeIDType.OPENFLOW, 42L);
+        NodeKey nodeKey = NodeMapping.toNodeKey(aDNode);
+        Assert.assertEquals("openflow:42", nodeKey.getId().getValue());
+    }
+
     /**
      * @param nodeId
      * @param portId
index b76370a..9369217 100644 (file)
@@ -69,4 +69,17 @@ public class TopologyMappingTest {
         Assert.assertEquals("OF|00:00:00:00:00:00:00:01", observedNode.toString());
     }
 
+    /**
+     * Test method for {@link org.opendaylight.controller.sal.compatibility.topology.TopologyMapping#toADNodeConnector(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId, org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId)}.
+     * @throws ConstructionException
+     */
+    @Test
+    public void bug1309ToADNodeConnector() throws ConstructionException {
+        NodeId nodeId = new NodeId("some_unknown_node");
+        TpId source = new TpId("192.168.0.1");
+        NodeConnector observedNodeConnector = TopologyMapping.toADNodeConnector(source, nodeId);
+
+        Assert.assertEquals("MD_SAL_DEPRECATED|192.168.0.1@MD_SAL_DEPRECATED|some_unknown_node", observedNodeConnector.toString());
+    }
+
 }
index 9641859..698dbcb 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.frm.impl;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.frm.ForwardingRulesManager;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -33,6 +32,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * GroupForwarder
  * It implements {@link org.opendaylight.controller.md.sal.binding.api.DataChangeListener}}
@@ -52,8 +53,27 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
     public FlowForwarder (final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, Flow.class);
         Preconditions.checkNotNull(db, "DataBroker can not be null!");
-        this.listenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
-                getWildCardPath(), FlowForwarder.this, DataChangeScope.SUBTREE);
+        registrationListener(db, 5);
+    }
+
+    private void registrationListener(final DataBroker db, int i) {
+        try {
+            listenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+                    getWildCardPath(), FlowForwarder.this, DataChangeScope.SUBTREE);
+        } catch (final Exception e) {
+            if (i >= 1) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    LOG.error("Thread interrupted '{}'", e1);
+                    Thread.currentThread().interrupt();
+                }
+                registrationListener(db, --i);
+            } else {
+                LOG.error("FRM Flow DataChange listener registration fail!", e);
+                throw new IllegalStateException("FlowForwarder registration Listener fail! System needs restart.", e);
+            }
+        }
     }
 
     @Override
@@ -61,7 +81,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (listenerRegistration != null) {
             try {
                 listenerRegistration.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 LOG.error("Error by stop FRM FlowChangeListener.", e);
             }
             listenerRegistration = null;
@@ -80,7 +100,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            this.provider.getSalFlowService().removeFlow(builder.build());
+            provider.getSalFlowService().removeFlow(builder.build());
         }
     }
 
@@ -99,7 +119,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build());
             builder.setOriginalFlow((new OriginalFlowBuilder(original)).build());
 
-            this.provider.getSalFlowService().updateFlow(builder.build());
+            provider.getSalFlowService().updateFlow(builder.build());
         }
     }
 
@@ -116,7 +136,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setFlowRef(new FlowRef(identifier));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            this.provider.getSalFlowService().addFlow(builder.build());
+            provider.getSalFlowService().addFlow(builder.build());
         }
     }
 
index 71a0de9..a9c81b9 100644 (file)
@@ -77,6 +77,7 @@
 
     <!-- XSQL -->
     <module>sal-dom-xsql</module>
+    <module>sal-karaf-xsql</module>
     <module>sal-dom-xsql-config</module>
 
     <!-- Yang Test Models for MD-SAL -->
index 3bfdf73..04df778 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -165,4 +167,24 @@ public class ExampleActor extends RaftActor {
     @Override public String persistenceId() {
         return getId();
     }
+
+    @Override
+    protected void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+    }
 }
index b436bce..2be4a0c 100644 (file)
@@ -18,13 +18,14 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected List<ReplicatedLogEntry> journal;
+    // We define this as ArrayList so we can use ensureCapacity.
+    protected ArrayList<ReplicatedLogEntry> journal;
     protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected ByteString previousSnapshot;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
@@ -106,6 +107,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         journal.add(replicatedLogEntry);
     }
 
+    @Override
+    public void increaseJournalLogCapacity(int amount) {
+        journal.ensureCapacity(journal.size() + amount);
+    }
+
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
         return getFrom(logEntryIndex, journal.size());
@@ -208,7 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public void snapshotCommit() {
-        snapshottedJournal.clear();
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
@@ -218,7 +223,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     @Override
     public void snapshotRollback() {
         snapshottedJournal.addAll(journal);
-        journal.clear();
         journal = snapshottedJournal;
         snapshottedJournal = null;
 
index ed6439d..bff2a27 100644 (file)
@@ -26,7 +26,7 @@ public interface ConfigParams {
      *
      * @return long
      */
-    public long getSnapshotBatchCount();
+    long getSnapshotBatchCount();
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -34,7 +34,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getHeartBeatInterval();
+    FiniteDuration getHeartBeatInterval();
 
     /**
      * The interval in which a new election would get triggered if no leader is found
@@ -43,7 +43,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getElectionTimeOutInterval();
+    FiniteDuration getElectionTimeOutInterval();
 
     /**
      * The maximum election time variance. The election is scheduled using both
@@ -51,10 +51,15 @@ public interface ConfigParams {
      *
      * @return int
      */
-    public int getElectionTimeVariance();
+    int getElectionTimeVariance();
 
     /**
      * The size (in bytes) of the snapshot chunk sent from Leader
      */
-    public int getSnapshotChunkSize();
+    int getSnapshotChunkSize();
+
+    /**
+     * The number of journal log entries to batch on recovery before applying.
+     */
+    int getJournalRecoveryLogBatchSize();
 }
index 9d06f63..dc41453 100644 (file)
@@ -20,12 +20,14 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
+    private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
+
     /**
      * The maximum election time variance
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
-    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+    private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
 
 
     /**
@@ -39,17 +41,32 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 
+    private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
+    private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
+    private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+
+    public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+
+    public void setSnapshotBatchCount(long snapshotBatchCount) {
+        this.snapshotBatchCount = snapshotBatchCount;
+    }
+
+    public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+        this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
+    }
+
     @Override
     public long getSnapshotBatchCount() {
-        return SNAPSHOT_BATCH_COUNT;
+        return snapshotBatchCount;
     }
 
     @Override
     public FiniteDuration getHeartBeatInterval() {
-        return HEART_BEAT_INTERVAL;
+        return heartBeatInterval;
     }
 
-
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
         // returns 2 times the heart beat interval
@@ -65,4 +82,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getSnapshotChunkSize() {
         return SNAPSHOT_CHUNK_SIZE;
     }
+
+    @Override
+    public int getJournalRecoveryLogBatchSize() {
+        return journalRecoveryLogBatchSize;
+    }
 }
index 8270f29..64fa749 100644 (file)
@@ -20,7 +20,9 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -38,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.Map;
 
@@ -96,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    protected RaftActorContext context;
+    private final RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -107,6 +108,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     private volatile boolean hasSnapshotCaptureInitiated = false;
 
+    private Stopwatch recoveryTimer;
+
+    private int currentRecoveryBatchCount;
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -121,59 +126,134 @@ public abstract class RaftActor extends UntypedPersistentActor {
             LOG);
     }
 
-    @Override public void onReceiveRecover(Object message) {
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = new Stopwatch();
+            recoveryTimer.start();
+        }
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
+                context.getConfigParams().getJournalRecoveryLogBatchSize());
+        super.preStart();
+    }
+
+    @Override
+    public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.info("SnapshotOffer called..");
-            SnapshotOffer offer = (SnapshotOffer) message;
-            Snapshot snapshot = (Snapshot) offer.snapshot();
+            onRecoveredSnapshot((SnapshotOffer)message);
+        } else if (message instanceof ReplicatedLogEntry) {
+            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
+        } else if (message instanceof ApplyLogEntries) {
+            onRecoveredApplyLogEntries((ApplyLogEntries)message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
+        } else if (message instanceof RecoveryCompleted) {
+            onRecoveryCompletedMessage();
+        }
+    }
 
-            // Create a replicated log with the snapshot information
-            // The replicated log can be used later on to retrieve this snapshot
-            // when we need to install it on a peer
-            replicatedLog = new ReplicatedLogImpl(snapshot);
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        LOG.debug("SnapshotOffer called..");
 
-            context.setReplicatedLog(replicatedLog);
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-            context.setCommitIndex(snapshot.getLastAppliedIndex());
+        initRecoveryTimer();
 
-            LOG.info("Applied snapshot to replicatedLog. " +
-                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size()
-            );
+        Snapshot snapshot = (Snapshot) offer.snapshot();
 
-            // Apply the snapshot to the actors state
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        } else if (message instanceof ReplicatedLogEntry) {
-            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+        context.setReplicatedLog(replicatedLog);
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
 
-            // Apply State immediately
-            replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+        Stopwatch timer = new Stopwatch();
+        timer.start();
 
-        } else if (message instanceof DeleteEntries) {
-            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        // Apply the snapshot to the actors state
+        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
 
-        } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                ((UpdateElectionTerm) message).getVotedFor());
+        timer.stop();
+        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+                replicatedLog.size(), persistenceId(), timer.toString(),
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+    }
 
-        } else if (message instanceof RecoveryCompleted) {
-            LOG.info(
-                "RecoveryCompleted - Switching actor to Follower - " +
-                    "Persistence Id =  " + persistenceId() +
-                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                    "journal-size={}",
-                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                replicatedLog.snapshotTerm, replicatedLog.size());
-            currentBehavior = switchBehavior(RaftState.Follower);
-            onStateChanged();
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+        }
+
+        replicatedLog.append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    context.getLastApplied() + 1, ale.getToIndex());
+        }
+
+        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+            batchRecoveredLogEntry(replicatedLog.get(i));
+        }
+
+        context.setLastApplied(ale.getToIndex());
+        context.setCommitIndex(ale.getToIndex());
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            startLogRecoveryBatch(batchSize);
+        }
+
+        appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
         }
     }
 
+    private void endCurrentLogRecoveryBatch() {
+        applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        onRecoveryComplete();
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        LOG.info(
+            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                "Persistence Id =  " + persistenceId() +
+                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+                "journal-size={}",
+            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+            replicatedLog.snapshotTerm, replicatedLog.size());
+
+        currentBehavior = switchBehavior(RaftState.Follower);
+        onStateChanged();
+    }
+
     @Override public void onReceiveCommand(Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
@@ -187,6 +267,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            }
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
@@ -373,6 +464,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return context.getLastApplied();
     }
 
+    protected RaftActorContext getRaftActorContext() {
+        return context;
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -413,6 +508,38 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void applyState(ActorRef clientActor, String identifier,
         Object data);
 
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    protected abstract void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     *
+     * @param data the state data
+     */
+    protected abstract void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshot A snapshot of the state of the actor
+     */
+    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     */
+    protected abstract void applyCurrentLogRecoveryBatch();
+
+    /**
+     * This method is called when recovery is complete.
+     */
+    protected abstract void onRecoveryComplete();
+
     /**
      * This method will be called by the RaftActor when a snapshot needs to be
      * created. The derived actor should respond with its current state.
@@ -425,10 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void createSnapshot();
 
     /**
-     * This method will be called by the RaftActor during recovery to
-     * reconstruct the state of the actor.
-     * <p/>
-     * This method may also be called at any other point during normal
+     * This method can be called at any other point during normal
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
@@ -585,6 +709,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // of a single command.
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
+                    @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
@@ -649,10 +774,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         private long currentTerm = 0;
         private String votedFor = null;
 
+        @Override
         public long getCurrentTerm() {
             return currentTerm;
         }
 
+        @Override
         public String getVotedFor() {
             return votedFor;
         }
index 25da371..e4aef0a 100644 (file)
@@ -59,26 +59,32 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
     }
 
+    @Override
     public ActorSelection actorSelection(String path){
         return context.actorSelection(path);
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
+    @Override
     public ActorRef getActor() {
         return actor;
     }
 
+    @Override
     public ElectionTerm getTermInformation() {
         return termInformation;
     }
 
+    @Override
     public long getCommitIndex() {
         return commitIndex;
     }
@@ -87,6 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.commitIndex = commitIndex;
     }
 
+    @Override
     public long getLastApplied() {
         return lastApplied;
     }
index c17f544..8589333 100644 (file)
@@ -74,6 +74,13 @@ public interface ReplicatedLog {
      */
     void append(ReplicatedLogEntry replicatedLogEntry);
 
+    /**
+     * Optimization method to increase the capacity of the journal log prior to appending entries.
+     *
+     * @param amount the amount to increase by
+     */
+    void increaseJournalLogCapacity(int amount);
+
     /**
      *
      * @param replicatedLogEntry
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java
new file mode 100644 (file)
index 0000000..af3c4fd
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * ApplyLogEntries serves as a message which is stored in the akka's persistent
+ * journal.
+ * During recovery if this message is found, then all in-mem journal entries from
+ * context.lastApplied to ApplyLogEntries.toIndex are applied to the state
+ *
+ * This class is also used as a internal message sent from Behaviour to
+ * RaftActor to persist the ApplyLogEntries
+ *
+ */
+public class ApplyLogEntries implements Serializable {
+    private final int toIndex;
+
+    public ApplyLogEntries(int toIndex) {
+        this.toIndex = toIndex;
+    }
+
+    public int getToIndex() {
+        return toIndex;
+    }
+}
index 35d563b..b1560a5 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -347,6 +348,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
         context.getLogger().debug("Setting last applied to {}", newLastApplied);
         context.setLastApplied(newLastApplied);
+
+        // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+        // will be used during recovery
+        //in case if the above code throws an error and this message is not sent, it would be fine
+        // as the  append entries received later would initiate add this message to the journal
+        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 199d2d6..ff8a225 100644 (file)
@@ -88,16 +88,12 @@ public class Leader extends AbstractRaftActorBehavior {
 
         LOG = context.getLogger();
 
-        if (lastIndex() >= 0) {
-            context.setCommitIndex(lastIndex());
-        }
-
         followers = context.getPeerAddresses().keySet();
 
         for (String followerId : followers) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(lastIndex()),
+                    new AtomicLong(context.getCommitIndex()),
                     new AtomicLong(-1));
 
             followerToLog.put(followerId, followerLogInformation);
index ca34a34..0d5f644 100644 (file)
@@ -200,6 +200,10 @@ public class MockRaftActorContext implements RaftActorContext {
     public static class MockPayload extends Payload implements Serializable {
         private String value = "";
 
+        public MockPayload(){
+
+        }
+
         public MockPayload(String s) {
             this.value = s;
         }
@@ -251,4 +255,24 @@ public class MockRaftActorContext implements RaftActorContext {
             return index;
         }
     }
+
+    public static class MockReplicatedLogBuilder {
+        private ReplicatedLog mockLog = new SimpleReplicatedLog();
+
+        public  MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+            for (int i=start; i<end; i++) {
+                this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
+            }
+            return this;
+        }
+
+        public  MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+            this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+            return this;
+        }
+
+        public ReplicatedLog build() {
+            return this.mockLog;
+        }
+    }
 }
index 9b099c2..22f3743 100644 (file)
@@ -4,56 +4,122 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
 
 
+    @After
+    public void tearDown() {
+        MockAkkaJournal.clearJournal();
+        MockSnapshotStore.setMockSnapshot(null);
+    }
+
     public static class MockRaftActor extends RaftActor {
 
-        boolean applySnapshotCalled = false;
+        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+            private final Map<String, String> peerAddresses;
+            private final String id;
+            private final Optional<ConfigParams> config;
+
+            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+                    Optional<ConfigParams> config) {
+                this.peerAddresses = peerAddresses;
+                this.id = id;
+                this.config = config;
+            }
+
+            @Override
+            public MockRaftActor create() throws Exception {
+                return new MockRaftActor(id, peerAddresses, config);
+            }
+        }
+
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
 
-        public MockRaftActor(String id,
-            Map<String, String> peerAddresses) {
-            super(id, peerAddresses);
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+            super(id, peerAddresses, config);
+            state = new ArrayList<>();
         }
 
-        public RaftActorContext getRaftActorContext() {
-            return context;
+        public void waitForRecoveryComplete() {
+            try {
+                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }
 
-        public boolean isApplySnapshotCalled() {
-            return applySnapshotCalled;
+        public List<Object> getState() {
+            return state;
         }
 
-        public static Props props(final String id, final Map<String, String> peerAddresses){
-            return Props.create(new Creator<MockRaftActor>(){
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                Optional<ConfigParams> config){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
+        }
 
-                @Override public MockRaftActor create() throws Exception {
-                    return new MockRaftActor(id, peerAddresses);
-                }
-            });
+        @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        }
+
+        @Override
+        protected void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        protected void appendRecoveredLogEntry(Payload data) {
+            state.add(data);
         }
 
-        @Override protected void applyState(ActorRef clientActor,
-            String identifier,
-            Object data) {
+        @Override
+        protected void applyCurrentLogRecoveryBatch() {
+        }
+
+        @Override
+        protected void onRecoveryComplete() {
+            recoveryComplete.countDown();
+        }
+
+        @Override
+        protected void applyRecoverySnapshot(ByteString snapshot) {
+            try {
+                Object data = toObject(snapshot);
+                System.out.println("!!!!!applyRecoverySnapshot: "+data);
+                if (data instanceof List) {
+                    state.addAll((List) data);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
         }
 
         @Override protected void createSnapshot() {
@@ -61,7 +127,6 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
-           applySnapshotCalled = true;
         }
 
         @Override protected void onStateChanged() {
@@ -71,6 +136,26 @@ public class RaftActorTest extends AbstractActorTest {
             return this.getId();
         }
 
+        private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+            Object obj = null;
+            ByteArrayInputStream bis = null;
+            ObjectInputStream ois = null;
+            try {
+                bis = new ByteArrayInputStream(bs.toByteArray());
+                ois = new ObjectInputStream(bis);
+                obj = ois.readObject();
+            } finally {
+                if (bis != null) {
+                    bis.close();
+                }
+                if (ois != null) {
+                    ois.close();
+                }
+            }
+            return obj;
+        }
+
+
     }
 
 
@@ -80,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest {
         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
             super(actorSystem);
 
-            raftActor = this.getSystem()
-                .actorOf(MockRaftActor.props(actorName,
-                    Collections.EMPTY_MAP), actorName);
+            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
 
         }
 
@@ -92,6 +176,7 @@ public class RaftActorTest extends AbstractActorTest {
             return
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -103,37 +188,15 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         public void findLeader(final String expectedLeader){
+            raftActor.tell(new FindLeader(), getRef());
 
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    raftActor.tell(new FindLeader(), getRef());
-
-                    String s = new ExpectMsg<String>(duration("1 seconds"),
-                        "findLeader") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof FindLeaderReply) {
-                                return ((FindLeaderReply) in).getLeaderActor();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();// this extracts the received message
-
-                    assertEquals(expectedLeader, s);
-
-                }
-
-
-            };
+            FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
         }
 
         public ActorRef getRaftActor() {
             return raftActor;
         }
-
     }
 
 
@@ -151,55 +214,103 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testActorRecovery() {
+    public void testRaftActorRecovery() throws Exception {
         new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    String persistenceId = "follower10";
-
-                    ActorRef followerActor = getSystem().actorOf(
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
-
-                    List<ReplicatedLogEntry> entries = new ArrayList<>();
-                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
-                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
-                    entries.add(entry1);
-                    entries.add(entry2);
-
-                    int lastApplied = 3;
-                    int lastIndex = 5;
-                    Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
-                    MockSnapshotStore.setMockSnapshot(snapshot);
-                    MockSnapshotStore.setPersistenceId(persistenceId);
-
-                    followerActor.tell(PoisonPill.getInstance(), null);
-                    try {
-                        // give some time for actor to die
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-                    try {
-                        //give some time for snapshot offer to get called.
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
-                    assertEquals(entries.size(), context.getReplicatedLog().size());
-                    assertEquals(lastApplied, context.getLastApplied());
-                    assertEquals(lastApplied, context.getCommitIndex());
-                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
-                }
-
-            };
+            String persistenceId = "follower10";
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            // Set the heartbeat interval high to essentially disable election otherwise the test
+            // may fail if the actor is switched to Leader and the commitIndex is set to the last
+            // log entry.
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+            watch(followerActor);
+
+            List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                    new MockRaftActorContext.MockPayload("E"));
+            snapshotUnappliedEntries.add(entry1);
+
+            int lastAppliedDuringSnapshotCapture = 3;
+            int lastIndexDuringSnapshotCapture = 4;
+
+                // 4 messages as part of snapshot, which are applied to state
+            ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+            Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                    snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+                    lastAppliedDuringSnapshotCapture, 1);
+            MockSnapshotStore.setMockSnapshot(snapshot);
+            MockSnapshotStore.setPersistenceId(persistenceId);
+
+            // add more entries after snapshot is taken
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                    new MockRaftActorContext.MockPayload("F"));
+            ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                    new MockRaftActorContext.MockPayload("G"));
+            ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                    new MockRaftActorContext.MockPayload("H"));
+            entries.add(entry2);
+            entries.add(entry3);
+            entries.add(entry4);
+
+            int lastAppliedToState = 5;
+            int lastIndex = 7;
+
+            MockAkkaJournal.addToJournal(5, entry2);
+            // 2 entries are applied to state besides the 4 entries in snapshot
+            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(7, entry3);
+            MockAkkaJournal.addToJournal(8, entry4);
+
+            // kill the actor
+            followerActor.tell(PoisonPill.getInstance(), null);
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            unwatch(followerActor);
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+                    MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+                            Optional.<ConfigParams>of(config)));
+
+            ref.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+                    context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
         }};
-
     }
 
-
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
 }
index fd4a75a..a72a7c4 100644 (file)
@@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
 import com.google.protobuf.ByteString;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -22,10 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,9 +30,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
-import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -527,15 +520,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
-        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
-        Timeout operationTimeout = new Timeout(operationDuration);
-        Future<Object> future = ask(actor, message, operationTimeout);
-
-        try {
-            return Await.result(future, operationDuration);
-        } catch (Exception e) {
-            throw e;
-        }
+        return MessageCollectorActor.getAllMessages(actor);
     }
 
     public ByteString getNextChunk (ByteString bs, int offset){
index c4ef51d..19af647 100644 (file)
@@ -20,9 +20,12 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 import java.io.ByteArrayOutputStream;
@@ -171,18 +174,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     actorContext.getReplicatedLog().removeFrom(0);
 
-                    actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
-                        new MockRaftActorContext.MockPayload("foo")));
-
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(1, 1,
-                            new MockRaftActorContext.MockPayload("foo"));
-
-                    actorContext.getReplicatedLog().append(entry);
+                    actorContext.setReplicatedLog(
+                        new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
+                            .build());
 
                     Leader leader = new Leader(actorContext);
                     RaftState raftState = leader
-                        .handleMessage(senderActor, new Replicate(null, "state-id",entry));
+                        .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
 
                     // State should not change
                     assertEquals(RaftState.Leader, raftState);
@@ -335,7 +333,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                             new MockRaftActorContext.MockPayload("D"));
 
-
                     RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
 
                     assertEquals(RaftState.Leader, raftState);
@@ -526,6 +523,157 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return null;
     }
 
+    public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+        private static AbstractRaftActorBehavior behavior;
+
+        public ForwardMessageToBehaviorActor(){
+
+        }
+
+        @Override public void onReceive(Object message) throws Exception {
+            super.onReceive(message);
+            behavior.handleMessage(sender(), message);
+        }
+
+        public static void setBehavior(AbstractRaftActorBehavior behavior){
+            ForwardMessageToBehaviorActor.behavior = behavior;
+        }
+    }
+
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            // follower too has the exact same log entries and has the same commit index
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            followerActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
+
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(
+                Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            // follower has the same log entries but its commit index > leaders commit index
+            followerActorContext.setCommitIndex(2);
+
+            Leader leader = new Leader(leaderActorContext);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
     private static class LeaderTestKit extends JavaTestKit {
 
         private LeaderTestKit(ActorSystem actorSystem) {
index 88eecfe..5892845 100644 (file)
@@ -8,10 +8,18 @@
 
 package org.opendaylight.controller.cluster.raft.utils;
 
+import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 
 public class MessageCollectorActor extends UntypedActor {
@@ -26,4 +34,35 @@ public class MessageCollectorActor extends UntypedActor {
             messages.add(message);
         }
     }
+
+    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = Patterns.ask(actor, "get-all-messages", operationTimeout);
+
+        try {
+            return (List<Object>) Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * Get the first message that matches the specified class
+     * @param actor
+     * @param clazz
+     * @return
+     */
+    public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return message;
+            }
+        }
+
+        return null;
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
new file mode 100644 (file)
index 0000000..85edc07
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class MockAkkaJournal extends AsyncWriteJournal {
+
+    private static Map<Long, Object> journal = Maps.newHashMap();
+
+    public static void addToJournal(long sequenceNr, Object message) {
+        journal.put(sequenceNr, message);
+    }
+
+    public static void clearJournal() {
+        journal.clear();
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+        long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                    PersistentRepr persistentMessage =
+                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+                    replayCallback.apply(persistentMessage);
+                }
+                return null;
+            }
+        }, context().dispatcher());
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+        return Futures.successful(new Long(0));
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> persistentReprs) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> persistentConfirmations) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds, boolean b) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+        return Futures.successful(null);
+    }
+}
index 6b2cc22..2f53d4a 100644 (file)
@@ -1,5 +1,6 @@
 akka {
     persistence.snapshot-store.plugin = "mock-snapshot-store"
+    persistence.journal.plugin = "mock-journal"
 
     loglevel = "DEBUG"
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
@@ -28,3 +29,10 @@ mock-snapshot-store {
   # Dispatcher for the plugin actor.
   plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
 }
+
+mock-journal {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
index 73c81ca..36b7a0f 100644 (file)
@@ -34,15 +34,15 @@ final class BindingTranslatedTransactionChain implements BindingTransactionChain
 
     private final DOMTransactionChain delegate;
     private final BindingToNormalizedNodeCodec codec;
-    private final DelegateChainListener delegatingListener;
-    private final TransactionChainListener listener;
+    private final DelegateChainListener domListener;
+    private final TransactionChainListener bindingListener;
 
     public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
             final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
         Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
-        this.delegatingListener = new DelegateChainListener();
-        this.listener = listener;
-        this.delegate = chainFactory.createTransactionChain(listener);
+        this.domListener = new DelegateChainListener();
+        this.bindingListener = listener;
+        this.delegate = chainFactory.createTransactionChain(domListener);
         this.codec = codec;
     }
 
@@ -110,7 +110,7 @@ final class BindingTranslatedTransactionChain implements BindingTransactionChain
          * chain, so we are not changing any of our internal state
          * to mark that we failed.
          */
-        this.delegatingListener.onTransactionChainFailed(this, tx, t);
+        this.bindingListener.onTransactionChainFailed(this, tx, t);
     }
 
     @Override
@@ -141,7 +141,7 @@ final class BindingTranslatedTransactionChain implements BindingTransactionChain
         public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
             Preconditions.checkState(delegate.equals(chain),
                     "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
-            listener.onTransactionChainSuccessful(BindingTranslatedTransactionChain.this);
+            bindingListener.onTransactionChainSuccessful(BindingTranslatedTransactionChain.this);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
new file mode 100644 (file)
index 0000000..36b2866
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.persistence.UntypedPersistentActor;
+
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    public AbstractUntypedPersistentActor() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Actor created {}", getSelf());
+        }
+        getContext().
+            system().
+            actorSelection("user/termination-monitor").
+            tell(new Monitor(getSelf()), getSelf());
+
+    }
+
+
+    @Override public void onReceiveCommand(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received message {}", messageType);
+        }
+        handleCommand(message);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Done handling message {}", messageType);
+        }
+
+    }
+
+    @Override public void onReceiveRecover(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received message {}", messageType);
+        }
+        handleRecover(message);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Done handling message {}", messageType);
+        }
+
+    }
+
+    protected abstract void handleRecover(Object message) throws Exception;
+
+    protected abstract void handleCommand(Object message) throws Exception;
+
+    protected void ignoreMessage(Object message) {
+        LOG.debug("Unhandled message {} ", message);
+    }
+
+    protected void unknownMessage(Object message) throws Exception {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received unhandled message {}", message);
+        }
+        unhandled(message);
+    }
+}
index 3e1bd35..44da4a5 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.data.api.Node;
@@ -36,6 +37,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNo
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -232,7 +234,7 @@ public class NormalizedNodeSerializer {
 
     private static class DeSerializer implements NormalizedNodeDeSerializationContext {
         private static Map<NormalizedNodeType, DeSerializationFunction>
-            deSerializationFunctions = new HashMap<>();
+            deSerializationFunctions = new EnumMap<>(NormalizedNodeType.class);
 
         static {
             deSerializationFunctions.put(CONTAINER_NODE_TYPE,
@@ -447,8 +449,9 @@ public class NormalizedNodeSerializer {
 
         private NormalizedNode deSerialize(NormalizedNodeMessages.Node node){
             Preconditions.checkNotNull(node, "node should not be null");
-            DeSerializationFunction deSerializationFunction =
-                Preconditions.checkNotNull(deSerializationFunctions.get(NormalizedNodeType.values()[node.getIntType()]), "Unknown type " + node);
+
+            DeSerializationFunction deSerializationFunction = deSerializationFunctions.get(
+                    NormalizedNodeType.values()[node.getIntType()]);
 
             return deSerializationFunction.apply(this, node);
         }
@@ -544,8 +547,4 @@ public class NormalizedNodeSerializer {
             NormalizedNode apply(DeSerializer deserializer, NormalizedNodeMessages.Node node);
         }
     }
-
-
-
-
 }
index d7627c0..4fb676e 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -26,6 +27,7 @@ import java.util.Set;
 import static org.opendaylight.controller.cluster.datastore.node.utils.serialization.PathArgumentType.getSerializablePathArgumentType;
 
 public class PathArgumentSerializer {
+    private static final String REVISION_ARG = "?revision=";
     private static final Map<Class, PathArgumentAttributesGetter> pathArgumentAttributesGetters = new HashMap<>();
 
     public static NormalizedNodeMessages.PathArgument serialize(NormalizedNodeSerializationContext context, YangInstanceIdentifier.PathArgument pathArgument){
@@ -190,27 +192,24 @@ public class PathArgumentSerializer {
         // If this serializer is used qName cannot be null (see encodeQName)
         // adding null check only in case someone tried to deSerialize a protocol buffer node
         // that was not serialized using the PathArgumentSerializer
-        Preconditions.checkNotNull(qName, "qName should not be null");
-        Preconditions.checkArgument(!"".equals(qName.getLocalName()),
-            "qName.localName cannot be empty qName = " + qName.toString());
-        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
+//        Preconditions.checkNotNull(qName, "qName should not be null");
+//        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
 
-        StringBuilder sb = new StringBuilder();
         String namespace = context.getNamespace(qName.getNamespace());
-        String revision = "";
         String localName = context.getLocalName(qName.getLocalName());
+        StringBuilder sb;
         if(qName.getRevision() != -1){
-            revision = context.getRevision(qName.getRevision());
-            sb.append("(").append(namespace).append("?revision=").append(
-                revision).append(")").append(
-                localName);
+            String revision = context.getRevision(qName.getRevision());
+            sb = new StringBuilder(namespace.length() + REVISION_ARG.length() + revision.length() +
+                    localName.length() + 2);
+            sb.append('(').append(namespace).append(REVISION_ARG).append(
+                revision).append(')').append(localName);
         } else {
-            sb.append("(").append(namespace).append(")").append(
-                localName);
+            sb = new StringBuilder(namespace.length() + localName.length() + 2);
+            sb.append('(').append(namespace).append(')').append(localName);
         }
 
         return sb.toString();
-
     }
 
     /**
@@ -223,10 +222,6 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         NormalizedNodeMessages.PathArgument pathArgument) {
 
-        Preconditions.checkArgument(pathArgument.getIntType() >= 0
-            && pathArgument.getIntType() < PathArgumentType.values().length,
-            "Illegal PathArgumentType " + pathArgument.getIntType());
-
         switch(PathArgumentType.values()[pathArgument.getIntType()]){
             case NODE_IDENTIFIER_WITH_VALUE : {
 
@@ -272,13 +267,21 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         List<NormalizedNodeMessages.PathArgumentAttribute> attributesList) {
 
-        Map<QName, Object> map = new HashMap<>();
-
-        for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+        Map<QName, Object> map;
+        if(attributesList.size() == 1) {
+            NormalizedNodeMessages.PathArgumentAttribute attribute = attributesList.get(0);
             NormalizedNodeMessages.QName name = attribute.getName();
             Object value = parseAttribute(context, attribute);
+            map = Collections.singletonMap(QNameFactory.create(qNameToString(context, name)), value);
+        } else {
+            map = new HashMap<>();
+
+            for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+                NormalizedNodeMessages.QName name = attribute.getName();
+                Object value = parseAttribute(context, attribute);
 
-            map.put(QNameFactory.create(qNameToString(context, name)), value);
+                map.put(QNameFactory.create(qNameToString(context, name)), value);
+            }
         }
 
         return map;
index 04c95d6..8def754 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -70,9 +69,6 @@ public class ValueSerializer {
 
 
     private static Object deSerializeBasicTypes(int valueType, String value) {
-        Preconditions.checkArgument(valueType >= 0 && valueType < ValueType.values().length,
-            "Illegal value type " + valueType );
-
         switch(ValueType.values()[valueType]){
            case SHORT_TYPE: {
                return Short.valueOf(value);
index 8724dfe..49db896 100644 (file)
@@ -50,8 +50,9 @@ public enum ValueType {
     public static final ValueType getSerializableType(Object node){
         Preconditions.checkNotNull(node, "node should not be null");
 
-        if(types.containsKey(node.getClass())) {
-            return types.get(node.getClass());
+        ValueType type = types.get(node.getClass());
+        if(type != null) {
+            return type;
         } else if(node instanceof Set){
             return BITS_TYPE;
         }
index 1021dde..83164b0 100644 (file)
@@ -8,11 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
-
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
@@ -27,22 +28,30 @@ public class DatastoreContext {
     private final Duration shardTransactionIdleTimeout;
     private final int operationTimeoutInSeconds;
     private final String dataStoreMXBeanType;
+    private final ConfigParams shardRaftConfig;
 
     public DatastoreContext() {
-        this.dataStoreProperties = null;
-        this.dataStoreMXBeanType = "DistributedDatastore";
-        this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
-        this.operationTimeoutInSeconds = 5;
+        this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500);
     }
 
     public DatastoreContext(String dataStoreMXBeanType,
             InMemoryDOMDataStoreConfigProperties dataStoreProperties,
             Duration shardTransactionIdleTimeout,
-            int operationTimeoutInSeconds) {
+            int operationTimeoutInSeconds,
+            int shardJournalRecoveryLogBatchSize,
+            int shardSnapshotBatchCount,
+            int shardHeartbeatIntervalInMillis) {
         this.dataStoreMXBeanType = dataStoreMXBeanType;
-        this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+        this.dataStoreProperties = dataStoreProperties;
         this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
         this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+
+        DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                TimeUnit.MILLISECONDS));
+        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+        shardRaftConfig = raftConfig;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -60,4 +69,8 @@ public class DatastoreContext {
     public int getOperationTimeoutInSeconds() {
         return operationTimeoutInSeconds;
     }
+
+    public ConfigParams getShardRaftConfig() {
+        return shardRaftConfig;
+    }
 }
index 0fa2770..ddb5989 100644 (file)
@@ -20,6 +20,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -47,12 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -67,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -84,8 +82,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class Shard extends RaftActor {
 
-    private static final ConfigParams configParams = new ShardConfigParams();
-
     public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
@@ -114,11 +110,18 @@ public class Shard extends RaftActor {
 
     private ActorRef createSnapshotTransaction;
 
+    /**
+     * Coordinates persistence recovery on startup.
+     */
+    private ShardRecoveryCoordinator recoveryCoordinator;
+    private List<Object> currentLogRecoveryBatch;
+
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+        super(name.toString(), mapPeerAddresses(peerAddresses),
+                Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
@@ -333,35 +336,12 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Could not find cohort for modification : {}. Writing modification using a new transaction",
-                    modification);
-            }
-
-            DOMStoreWriteTransaction transaction =
-                store.newWriteOnlyTransaction();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
-            }
-
-            modification.apply(transaction);
-            try {
-                syncCommitTransaction(transaction);
-            } catch (InterruptedException | ExecutionException e) {
-                shardMBean.incrementFailedTransactionsCount();
-                LOG.error("Failed to commit", e);
-                return;
-            }
-            //we want to just apply the recovery commit and return
-            shardMBean.incrementCommittedTransactionCount();
+            // If there's no cached cohort then we must be applying replicated state.
+            commitWithNewTransaction(serialized);
             return;
         }
 
-
-        if(sender == null){
+        if(sender == null) {
             LOG.error("Commit failed. Sender cannot be null");
             return;
         }
@@ -386,6 +366,18 @@ public class Shard extends RaftActor {
 
     }
 
+    private void commitWithNewTransaction(Object modification) {
+        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+        MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+        try {
+            syncCommitTransaction(tx);
+            shardMBean.incrementCommittedTransactionCount();
+        } catch (InterruptedException | ExecutionException e) {
+            shardMBean.incrementFailedTransactionsCount();
+            LOG.error(e, "Failed to commit");
+        }
+    }
+
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
         Object serializedModification =
             message.getModification().toSerializable();
@@ -461,26 +453,102 @@ public class Shard extends RaftActor {
         return config.isMetricCaptureEnabled();
     }
 
-    @Override protected void applyState(ActorRef clientActor, String identifier,
-        Object data) {
+    @Override
+    protected
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+        }
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+        if (data instanceof CompositeModificationPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else {
+            LOG.error("Unknown state received {} during recovery", data);
+        }
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+        }
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+                    currentLogRecoveryBatch.size());
+        }
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        if(recoveryCoordinator != null) {
+            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+            }
+
+            for(DOMStoreWriteTransaction tx: txList) {
+                try {
+                    syncCommitTransaction(tx);
+                    shardMBean.incrementCommittedTransactionCount();
+                } catch (InterruptedException | ExecutionException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error(e, "Failed to commit");
+                }
+            }
+        }
+
+        recoveryCoordinator = null;
+        currentLogRecoveryBatch = null;
+        updateJournalStats();
+    }
+
+    @Override
+    protected void applyState(ActorRef clientActor, String identifier, Object data) {
 
         if (data instanceof CompositeModificationPayload) {
-            Object modification =
-                ((CompositeModificationPayload) data).getModification();
+            Object modification = ((CompositeModificationPayload) data).getModification();
 
             if (modification != null) {
                 commit(clientActor, modification);
             } else {
                 LOG.error(
                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor.path().toString());
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
             }
 
         } else {
-            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
+            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    data, data.getClass().getClassLoader(),
+                    CompositeModificationPayload.class.getClassLoader());
         }
 
-        // Update stats
+        updateJournalStats();
+
+    }
+
+    private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
         if (lastLogEntry != null) {
@@ -490,10 +558,10 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
-
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    protected void createSnapshot() {
         if (createSnapshotTransaction == null) {
 
             // Create a transaction. We are really going to treat the transaction as a worker
@@ -508,7 +576,9 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+    @VisibleForTesting
+    @Override
+    protected void applySnapshot(ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -565,16 +635,6 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
-
-    private static class ShardConfigParams extends DefaultConfigParamsImpl {
-        public static final FiniteDuration HEART_BEAT_INTERVAL =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        @Override public FiniteDuration getHeartBeatInterval() {
-            return HEART_BEAT_INTERVAL;
-        }
-    }
-
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
@@ -598,20 +658,24 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+    @VisibleForTesting
+    NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(YangInstanceIdentifier.builder().build());
+            transaction.read(id);
 
-        NormalizedNode<?, ?> node = future.get().get();
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
         transaction.close();
 
         return node;
     }
 
-    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+    @VisibleForTesting
+    void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
         throws ExecutionException, InterruptedException {
         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -620,4 +684,8 @@ public class Shard extends RaftActor {
         syncCommitTransaction(transaction);
     }
 
+    @VisibleForTesting
+    ShardStats getShardMBean() {
+        return shardMBean;
+    }
 }
index a97c00f..a8a1823 100644 (file)
@@ -15,11 +15,16 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -32,14 +37,18 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -50,7 +59,10 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActorWithMetering {
+public class ShardManager extends AbstractUntypedPersistentActor {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -74,6 +86,8 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     private final DatastoreContext datastoreContext;
 
+    private final Collection<String> knownModules = new HashSet<>(128);
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
@@ -105,7 +119,7 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
     }
 
     @Override
-    public void handleReceive(Object message) throws Exception {
+    public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
@@ -125,6 +139,23 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     }
 
+    @Override protected void handleRecover(Object message) throws Exception {
+
+        if(message instanceof SchemaContextModules){
+            SchemaContextModules msg = (SchemaContextModules) message;
+            knownModules.clear();
+            knownModules.addAll(msg.getModules());
+        } else if(message instanceof RecoveryFailure){
+            RecoveryFailure failure = (RecoveryFailure) message;
+            LOG.error(failure.cause(), "Recovery failed");
+        } else if(message instanceof RecoveryCompleted){
+            LOG.info("Recovery complete : {}", persistenceId());
+
+            // Delete all the messages from the akka journal except the last one
+            deleteMessages(lastSequenceNr() - 1);
+        }
+    }
+
     private void findLocalShard(FindLocalShard message) {
         ShardInformation shardInformation =
             localShards.get(message.getShardName());
@@ -159,16 +190,42 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
      *
      * @param message
      */
-    private void updateSchemaContext(Object message) {
-        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+    private void updateSchemaContext(final Object message) {
+        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+        Set<String> newModules = new HashSet<>(128);
+
+        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+            String s = moduleIdentifier.getNamespace().toString();
+            newModules.add(s);
+        }
+
+        if(newModules.containsAll(knownModules)) {
+
+            LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+            knownModules.clear();
+            knownModules.addAll(newModules);
+
+            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
-        if(localShards.size() == 0){
-            createLocalShards(schemaContext);
+                @Override public void apply(SchemaContextModules param) throws Exception {
+                    LOG.info("Sending new SchemaContext to Shards");
+                    if (localShards.size() == 0) {
+                        createLocalShards(schemaContext);
+                    } else {
+                        for (ShardInformation info : localShards.values()) {
+                            info.getActor().tell(message, getSelf());
+                        }
+                    }
+                }
+
+            });
         } else {
-            for (ShardInformation info : localShards.values()) {
-                info.getActor().tell(message, getSelf());
-            }
+            LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
         }
+
     }
 
     private void findPrimary(FindPrimary message) {
@@ -249,8 +306,8 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
-                    withMailbox(ActorContext.MAILBOX), shardId.toString());
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
+                    shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
@@ -306,6 +363,14 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     }
 
+    @Override public String persistenceId() {
+        return "shard-manager-" + type;
+    }
+
+    @VisibleForTesting public Collection<String> getKnownModules() {
+        return knownModules;
+    }
+
     private class ShardInformation {
         private final String shardName;
         private final ActorRef actor;
@@ -371,6 +436,18 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
             return new ShardManager(type, cluster, configuration, datastoreContext);
         }
     }
+
+    static class SchemaContextModules implements Serializable {
+        private final Set<String> modules;
+
+        SchemaContextModules(Set<String> modules){
+            this.modules = modules;
+        }
+
+        public Set<String> getModules() {
+            return modules;
+        }
+    }
 }
 
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
new file mode 100644 (file)
index 0000000..8afdb4c
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
+ * and journal log entry batch are de-serialized and applied to their own write transaction
+ * instance in parallel on a thread pool for faster recovery time. However the transactions are
+ * committed to the data store in the order the corresponding snapshot or log batch are received
+ * to preserve data store integrity.
+ *
+ * @author Thomas Panetelis
+ */
+class ShardRecoveryCoordinator {
+
+    private static final int TIME_OUT = 10;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
+
+    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
+    private final SchemaContext schemaContext;
+    private final String shardName;
+    private final ExecutorService executor;
+
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+        this.shardName = shardName;
+
+        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+                new ThreadFactoryBuilder().setDaemon(true)
+                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
+    }
+
+    /**
+     * Submits a batch of journal log entries.
+     *
+     * @param logEntries the serialized journal log entries
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    /**
+     * Submits a snapshot.
+     *
+     * @param snapshot the serialized snapshot
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    Collection<DOMStoreWriteTransaction> getTransactions() {
+        // Shutdown the executor and wait for task completion.
+        executor.shutdown();
+
+        try {
+            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
+                return resultingTxList;
+            } else {
+                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        return Collections.emptyList();
+    }
+
+    private static abstract class ShardRecoveryTask implements Runnable {
+
+        final DOMStoreWriteTransaction resultingTx;
+
+        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
+            this.resultingTx = resultingTx;
+        }
+    }
+
+    private class LogRecoveryTask extends ShardRecoveryTask {
+
+        private final List<Object> logEntries;
+
+        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.logEntries = logEntries;
+        }
+
+        @Override
+        public void run() {
+            for(int i = 0; i < logEntries.size(); i++) {
+                MutableCompositeModification.fromSerializable(
+                        logEntries.get(i), schemaContext).apply(resultingTx);
+                // Null out to GC quicker.
+                logEntries.set(i, null);
+            }
+        }
+    }
+
+    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+
+        private final ByteString snapshot;
+
+        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.snapshot = snapshot;
+        }
+
+        @Override
+        public void run() {
+            try {
+                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
+                        YangInstanceIdentifier.builder().build(), serializedNode);
+
+                // delete everything first
+                resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+                // Add everything from the remote node back
+                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
+            } catch (InvalidProtocolBufferException e) {
+                LOG.error("Error deserializing snapshot", e);
+            }
+        }
+    }
+}
index e7a7aab..84614bd 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedConfigDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
index 814e6f6..3183527 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
index e19a767..af43f95 100644 (file)
@@ -36,8 +36,8 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
-    typedef non-zero-uint16-type {
-        type uint16 {
+    typedef non-zero-uint32-type {
+        type uint32 {
             range "1..max";
         }
     }
@@ -48,43 +48,67 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef heartbeat-interval-type {
+        type uint16 {
+            range "100..max";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
 
          leaf max-shard-data-store-executor-queue-size {
             default 5000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store executor.";
          }
 
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
 
+         leaf shard-snapshot-batch-count {
+            default 20000;
+            type non-zero-uint32-type;
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+         }
+
+         leaf shard-hearbeat-interval-in-millis {
+            default 500;
+            type heartbeat-interval-type;
+            description "The interval at which a shard will send a heart beat message to its remote shard.";
+         }
+
          leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
          }
 
+         leaf shard-journal-recovery-log-batch-size {
+            default 5000;
+            type non-zero-uint32-type;
+            description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
@@ -93,7 +117,7 @@ module distributed-datastore-provider {
 
          leaf bounded-mailbox-capacity {
              default 1000;
-             type non-zero-uint16-type;
+             type non-zero-uint32-type;
              description "Max queue size that an actor's mailbox can reach";
          }
     }
index 022ef9b..fae21f2 100644 (file)
@@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.commons.io.FileUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.io.File;
 import java.io.IOException;
 
 public abstract class AbstractActorTest {
@@ -25,35 +24,15 @@ public abstract class AbstractActorTest {
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
-
-        deletePersistenceFiles();
     }
 
     @AfterClass
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
-
-        deletePersistenceFiles();
-    }
-
-    protected static void deletePersistenceFiles() throws IOException {
-        File journal = new File("journal");
-
-        if(journal.exists()) {
-            FileUtils.deleteDirectory(journal);
-        }
-
-        File snapshots = new File("snapshots");
-
-        if(snapshots.exists()){
-            FileUtils.deleteDirectory(snapshots);
-        }
-
     }
 
     protected ActorSystem getSystem() {
         return system;
     }
-
 }
index 02201f7..8a3cdd0 100644 (file)
@@ -3,10 +3,23 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import junit.framework.Assert;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -19,17 +32,41 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ShardManagerTest {
     private static ActorSystem system;
 
     @BeforeClass
-    public static void setUp() {
-        system = ActorSystem.create("test");
+    public static void setUpClass() {
+        Map<String, String> myJournal = new HashMap<>();
+        myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
+        myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
+        Config config = ConfigFactory.load()
+            .withValue("akka.persistence.journal.plugin",
+                ConfigValueFactory.fromAnyRef("my-journal"))
+            .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
+
+        MyJournal.clear();
+
+        system = ActorSystem.create("test", config);
     }
 
     @AfterClass
@@ -38,29 +75,27 @@ public class ShardManagerTest {
         system = null;
     }
 
+    @Before
+    public void setUpTest(){
+        MyJournal.clear();
+    }
+
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+        new JavaTestKit(system) {
+            {
+                final Props props = ShardManager
+                    .props("config", new MockClusterWrapper(),
+                        new MockConfiguration(), new DatastoreContext());
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+                final ActorRef subject = getSystem().actorOf(props);
 
-                    subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
+                subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
 
-                    expectMsgEquals(Duration.Zero(),
-                        new PrimaryNotFound("inventory").toSerializable());
-
-                    expectNoMsg();
-                }
-            };
-        }};
+                expectMsgEquals(duration("2 seconds"),
+                    new PrimaryNotFound("inventory").toSerializable());
+            }};
     }
 
     @Test
@@ -70,22 +105,14 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
-
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+            final ActorRef subject = getSystem().actorOf(props);
 
-                    subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
-                    expectNoMsg();
-                }
-            };
+            expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
         }};
     }
 
@@ -96,31 +123,23 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
-
-                    subject.tell(new FindLocalShard("inventory"), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("10 seconds"), "find local") {
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof LocalShardNotFound) {
-                                return ((LocalShardNotFound) in).getShardName();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            final ActorRef subject = getSystem().actorOf(props);
 
-                    assertEquals("inventory", out);
+            subject.tell(new FindLocalShard("inventory"), getRef());
 
-                    expectNoMsg();
+            final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
+                @Override
+                protected String match(Object in) {
+                    if (in instanceof LocalShardNotFound) {
+                        return ((LocalShardNotFound) in).getShardName();
+                    } else {
+                        throw noMatch();
+                    }
                 }
-            };
+            }.get(); // this extracts the received message
+
+            assertEquals("inventory", out);
         }};
     }
 
@@ -133,40 +152,109 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", mockClusterWrapper,
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+
+            final ActorRef subject = getSystem().actorOf(props);
 
             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            new Within(duration("10 seconds")) {
+            subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+            final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
                 @Override
-                protected void run() {
-
-                    subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
-
-                    final ActorRef out = new ExpectMsg<ActorRef>(duration("10 seconds"), "find local") {
-                        @Override
-                        protected ActorRef match(Object in) {
-                            if (in instanceof LocalShardFound) {
-                                return ((LocalShardFound) in).getPath();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+                protected ActorRef match(Object in) {
+                    if (in instanceof LocalShardFound) {
+                        return ((LocalShardFound) in).getPath();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get(); // this extracts the received message
+
+            assertTrue(out.path().toString(),
+                out.path().toString().contains("member-1-shard-default-config"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveMemberUp() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
 
-                    assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+            final ActorRef subject = getSystem().actorOf(props);
 
+            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
 
-                    expectNoMsg();
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
+                // do not put code outside this method, will run afterwards
+                @Override
+                protected String match(Object in) {
+                    if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+                        PrimaryFound f = PrimaryFound.fromSerializable(in);
+                        return f.getPrimaryPath();
+                    } else {
+                        throw noMatch();
+                    }
                 }
-            };
+            }.get(); // this extracts the received message
+
+            assertTrue(out, out.contains("member-2-shard-astronauts-config"));
         }};
     }
 
     @Test
-    public void testOnReceiveMemberUp() throws Exception {
+    public void testOnReceiveMemberDown() throws Exception {
 
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+
+            final ActorRef subject = getSystem().actorOf(props);
+
+            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+
+            MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+        }};
+    }
+
+    @Test
+    public void testOnRecoveryJournalIsEmptied(){
+        MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
+            ImmutableSet.of("foo")));
+
+        assertEquals(1, MyJournal.get().size());
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+
+            final ActorRef subject = getSystem().actorOf(props);
+
+            // Send message to check that ShardManager is ready
+            subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+
+            assertEquals(0, MyJournal.get().size());
+        }};
+    }
+
+    @Test
+    public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
@@ -174,39 +262,63 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
-
-                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
-
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
-                                PrimaryFound f = PrimaryFound.fromSerializable(in);
-                                return f.getPrimaryPath();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
 
-                    Assert.assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
-                    expectNoMsg();
-                }
-            };
+            assertTrue(knownModules.contains("foo"));
         }};
     }
 
     @Test
-    public void testOnReceiveMemberDown() throws Exception {
+    public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+        throws Exception {
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+            assertEquals(0, knownModules.size());
+
+            SchemaContext schemaContext = mock(SchemaContext.class);
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+            ModuleIdentifier foo = mock(ModuleIdentifier.class);
+            when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+            moduleIdentifierSet.add(foo);
+
+            when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("foo"));
+
+            assertEquals(1, knownModules.size());
+
+            ModuleIdentifier bar = mock(ModuleIdentifier.class);
+            when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+            moduleIdentifierSet.add(bar);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("bar"));
 
+            assertEquals(2, knownModules.size());
+
+        }};
+
+    }
+
+
+    @Test
+    public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+        throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
@@ -214,28 +326,117 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
-                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+            assertEquals(0, knownModules.size());
 
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            SchemaContext schemaContext = mock(SchemaContext.class);
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
 
-                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            ModuleIdentifier foo = mock(ModuleIdentifier.class);
+            when(foo.getNamespace()).thenReturn(new URI("foo"));
 
-                    MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+            moduleIdentifierSet.add(foo);
 
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
 
-                    expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("foo"));
+
+            assertEquals(1, knownModules.size());
+
+            //Create a completely different SchemaContext with only the bar module in it
+            schemaContext = mock(SchemaContext.class);
+            moduleIdentifierSet = new HashSet<>();
+            ModuleIdentifier bar = mock(ModuleIdentifier.class);
+            when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+            moduleIdentifierSet.add(bar);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertFalse(knownModules.contains("bar"));
+
+            assertEquals(1, knownModules.size());
 
-                    expectNoMsg();
-                }
-            };
         }};
+
+    }
+
+
+    private void sleep(long period){
+        Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
     }
 
+    public static class MyJournal extends AsyncWriteJournal {
+
+        private static Map<Long, Object> journal = Maps.newTreeMap();
+
+        public static void addToJournal(Long sequenceNr, Object value){
+            journal.put(sequenceNr, value);
+        }
+
+        public static Map<Long, Object> get(){
+            return journal;
+        }
+
+        public static void clear(){
+            journal.clear();
+        }
 
+        @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
+            final Procedure<PersistentRepr> replayCallback) {
+            if(journal.size() == 0){
+                return Futures.successful(null);
+            }
+            return Futures.future(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    for (Map.Entry<Long, Object> entry : journal.entrySet()) {
+                        PersistentRepr persistentMessage =
+                            new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                false, null, null);
+                        replayCallback.apply(persistentMessage);
+                    }
+                    return null;
+                }
+            }, context().dispatcher());
+        }
+
+        @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+            return Futures.successful(-1L);
+        }
+
+        @Override public Future<Void> doAsyncWriteMessages(
+            final Iterable<PersistentRepr> persistentReprs) {
+            return Futures.future(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    for (PersistentRepr repr : persistentReprs){
+                        if(repr.payload() instanceof ShardManager.SchemaContextModules) {
+                            journal.put(repr.sequenceNr(), repr.payload());
+                        }
+                    }
+                    return null;
+                }
+            }, context().dispatcher());
+        }
+
+        @Override public Future<Void> doAsyncWriteConfirmations(
+            Iterable<PersistentConfirmation> persistentConfirmations) {
+            return Futures.successful(null);
+        }
+
+        @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
+            boolean b) {
+            clear();
+            return Futures.successful(null);
+        }
+
+        @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+            clear();
+            return Futures.successful(null);
+        }
+    }
 }
index deb71c2..a3e0b3a 100644 (file)
@@ -4,23 +4,43 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
+import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -28,222 +48,138 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
 
 public class ShardTest extends AbstractActorTest {
 
-    private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
+    private static final DatastoreContext DATA_STORE_CONTEXT =
+            new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
 
-    @Test
-    public void testOnReceiveRegisterListener() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testRegisterChangeListener");
+    private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+    @Before
+    public void setUp() {
+        System.setProperty("shard.persistent", "false");
 
-                    subject.tell(
-                        new UpdateSchemaContext(SchemaContextHelper.full()),
-                        getRef());
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                        getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
-                        getRef());
+    @After
+    public void tearDown() {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    final Boolean notificationEnabled = new ExpectMsg<Boolean>(
-                                                   duration("3 seconds"), "enable notification") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if(in instanceof EnableNotification){
-                                return ((EnableNotification) in).isEnabled();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertFalse(notificationEnabled);
-
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(RegisterChangeListenerReply.class)) {
-                                RegisterChangeListenerReply reply =
-                                    (RegisterChangeListenerReply) in;
-                                return reply.getListenerRegistrationPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+    private Props newShardProps() {
+        return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+    }
 
-                    assertTrue(out.matches(
-                        "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-                }
+    @Test
+    public void testOnReceiveRegisterListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
 
+            subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
 
-            };
+            subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+                    getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+            EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
+            assertEquals("isEnabled", false, enable.isEnabled());
+
+            RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+                    RegisterChangeListenerReply.class);
+            assertTrue(reply.getListenerRegistrationPath().toString().matches(
+                    "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
         }};
     }
 
     @Test
     public void testCreateTransaction(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransaction");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
+        new ShardTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            waitUntilLeader(subject);
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testCreateTransactionOnChain(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransactionOnChain");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
-
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            waitUntilLeader(subject);
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+                    getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testPeerAddressResolved(){
         new JavaTestKit(getSystem()) {{
-            Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
             final ShardIdentifier identifier =
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testPeerAddressResolved");
+            Props props = Shard.props(identifier,
+                    Collections.<ShardIdentifier, String>singletonMap(identifier, null),
+                    DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+            final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
 
             new Within(duration("3 seconds")) {
                 @Override
@@ -261,99 +197,205 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testApplySnapshot() throws ExecutionException, InterruptedException {
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
 
-        final ShardIdentifier identifier =
-            ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
+        NormalizedNodeToNodeCodec codec =
+            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        peerAddresses.put(identifier, null);
-        final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
+                TestModel.TEST_QNAME));
 
-        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
 
-        ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+        NormalizedNodeMessages.Container encode = codec.encode(root, expected);
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
 
-        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        ref.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode expected = ref.underlyingActor().readStore();
+        NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
 
-        NormalizedNodeMessages.Container encode = codec
-            .encode(YangInstanceIdentifier.builder().build(), expected);
+        assertEquals(expected, actual);
+    }
 
+    @Test
+    public void testApplyState() throws Exception {
 
-        ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-        NormalizedNode actual = ref.underlyingActor().readStore();
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        assertEquals(expected, actual);
-    }
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
+        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
+        ApplyState applyState = new ApplyState(null, "test",
+                new ReplicatedLogImplEntry(1, 2, payload));
 
-    private static class ShardTestKit extends JavaTestKit {
+        shard.underlyingActor().onReceiveCommand(applyState);
 
-        private ShardTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
+        NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
+        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+
+        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
+        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+
+        InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
+                        YangInstanceIdentifier.builder().build(), root).
+                                getNormalizedNode().toByteString().toByteArray(),
+                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                          SCHEMA_CONTEXT))));
+
+        int nListEntries = 11;
+        Set<Integer> listEntryKeys = new HashSet<>();
+        for(int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newPayload(mod)));
         }
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(logLevel
-                ) {
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+                new ApplyLogEntries(nListEntries));
+
+        // Create the actor and wait for recovery complete.
+
+        final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+        Creator<Shard> creator = new Creator<Shard>() {
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                        DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
                     @Override
-                    protected Boolean run() {
-                        return true;
+                    protected void onRecoveryComplete() {
+                        try {
+                            super.onRecoveryComplete();
+                        } finally {
+                            recoveryComplete.countDown();
+                        }
                     }
-                }.from(subject.path().toString())
-                    .message(logMessage)
-                    .occurrences(1).exec();
+                };
+            }
+        };
 
-            Assert.assertEquals(true, result);
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+
+        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+
+        // Verify data in the data store.
+
+        NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        for(Object entry: (Iterable<?>) outerList.getValue()) {
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                    entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            Object value = idLeaf.get().getValue();
+            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
+                    listEntryKeys.remove(value));
+        }
 
+        if(!listEntryKeys.isEmpty()) {
+            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
+                    listEntryKeys);
         }
 
+        assertEquals("Last log index", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastLogIndex());
+        assertEquals("Commit index", nListEntries,
+                shard.underlyingActor().getShardMBean().getCommitIndex());
+        assertEquals("Last applied", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastApplied());
     }
 
+    private CompositeModificationPayload newPayload(Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationPayload(compMod.toSerializable());
+    }
+
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
+    public void testForwardedCommitTransactionWithPersistence() throws IOException {
+        System.setProperty("shard.persistent", "true");
+
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateSnapshot");
+            waitUntilLeader(shard);
 
-            // Wait for a specific log message to show up
-            this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+            NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            MutableCompositeModification modification = new MutableCompositeModification();
+            modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
+                    SCHEMA_CONTEXT));
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+            verify(cohort).commit();
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
+        }};
+    }
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+    @Test
+    public void testCreateSnapshot() throws IOException, InterruptedException {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
 
-                }
-            };
+            waitUntilLeader(subject);
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
-            deletePersistenceFiles();
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
         }};
     }
 
@@ -366,7 +408,7 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
 
-        store.onGlobalContextUpdated(TestModel.createTestContext());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
         putTransaction.write(TestModel.TEST_PATH,
@@ -424,4 +466,46 @@ public class ShardTest extends AbstractActorTest {
             }
         };
     }
+
+    private static final class DelegatingShardCreator implements Creator<Shard> {
+        private final Creator<Shard> delegate;
+
+        DelegatingShardCreator(Creator<Shard> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return delegate.create();
+        }
+    }
+
+    private static class ShardTestKit extends JavaTestKit {
+
+        private ShardTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(logLevel
+                ) {
+                    @Override
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message(logMessage)
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+
+        protected void waitUntilLeader(ActorRef subject) {
+            waitForLogMessage(Logging.Info.class, subject,
+                    "Switching from state Candidate to Leader");
+        }
+    }
 }
index 0beb00b..3f31591 100644 (file)
@@ -503,7 +503,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         datastoreContext = new DatastoreContext("Test",
                 InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS), 5);
+                Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java
new file mode 100644 (file)
index 0000000..c9a0eaf
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+
+public class InMemoryJournal extends AsyncWriteJournal {
+
+    private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+    public static void addEntry(String persistenceId, long sequenceNr, Object data) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal == null) {
+            journal = Maps.newLinkedHashMap();
+            journals.put(persistenceId, journal);
+        }
+
+        journal.put(sequenceNr, data);
+    }
+
+    public static void clear() {
+        journals.clear();
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+            long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                Map<Long, Object> journal = journals.get(persistenceId);
+                if(journal == null) {
+                    return null;
+                }
+
+                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                    PersistentRepr persistentMessage =
+                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+                    replayCallback.apply(persistentMessage);
+                }
+
+                return null;
+            }
+        }, context().dispatcher());
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+        return Futures.successful(new Long(0));
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        return Futures.successful(null);
+    }
+}
index 0e492f0..22e522b 100644 (file)
@@ -16,46 +16,66 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.snapshot.japi.SnapshotStore;
 import com.google.common.collect.Iterables;
 import scala.concurrent.Future;
-
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 
 public class InMemorySnapshotStore extends SnapshotStore {
 
-    Map<String, List<Snapshot>> snapshots = new HashMap<>();
+    private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
+
+    public static void addSnapshot(String persistentId, Snapshot snapshot) {
+        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+
+        if(snapshotList == null) {
+            snapshotList = new ArrayList<>();
+            snapshots.put(persistentId, snapshotList);
+        }
+
+        snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+                System.currentTimeMillis()), snapshot));
+    }
+
+    public static void clear() {
+        snapshots.clear();
+    }
 
-    @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+    @Override
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
         SnapshotSelectionCriteria snapshotSelectionCriteria) {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
         if(snapshotList == null){
             return Futures.successful(Option.<SelectedSnapshot>none());
         }
 
-        Snapshot snapshot = Iterables.getLast(snapshotList);
+        StoredSnapshot snapshot = Iterables.getLast(snapshotList);
         SelectedSnapshot selectedSnapshot =
             new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
         return Futures.successful(Option.some(selectedSnapshot));
     }
 
-    @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             snapshotList = new ArrayList<>();
             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
         }
-        snapshotList.add(new Snapshot(snapshotMetadata, o));
+        snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
 
         return Futures.successful(null);
     }
 
-    @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+    @Override
+    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
     }
 
-    @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             return;
@@ -64,7 +84,7 @@ public class InMemorySnapshotStore extends SnapshotStore {
         int deleteIndex = -1;
 
         for(int i=0;i<snapshotList.size(); i++){
-            Snapshot snapshot = snapshotList.get(i);
+            StoredSnapshot snapshot = snapshotList.get(i);
             if(snapshotMetadata.equals(snapshot.getMetadata())){
                 deleteIndex = i;
                 break;
@@ -77,9 +97,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
 
     }
 
-    @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+    @Override
+    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
         throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
 
         if(snapshotList == null){
             return;
@@ -90,11 +111,11 @@ public class InMemorySnapshotStore extends SnapshotStore {
         snapshots.remove(s);
     }
 
-    private static class Snapshot {
+    private static class StoredSnapshot {
         private final SnapshotMetadata metadata;
         private final Object data;
 
-        private Snapshot(SnapshotMetadata metadata, Object data) {
+        private StoredSnapshot(SnapshotMetadata metadata, Object data) {
             this.metadata = metadata;
             this.data = data;
         }
index f0dadc6..3a37dd9 100644 (file)
@@ -1,5 +1,6 @@
 akka {
     persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
 
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
@@ -17,6 +18,10 @@ akka {
     }
 }
 
+in-memory-journal {
+    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
 in-memory-snapshot-store {
   # Class name of the plugin.
   class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModule.java
deleted file mode 100644 (file)
index df1b5a3..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-
-import org.opendaylight.controller.sal.dom.broker.impl.HashMapDataStore;
-
-/**
-*
-*/
-public final class HashMapDataStoreModule extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractHashMapDataStoreModule
-{
-
-    public HashMapDataStoreModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
-        super(identifier, dependencyResolver);
-    }
-
-    public HashMapDataStoreModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, HashMapDataStoreModule oldModule, java.lang.AutoCloseable oldInstance) {
-        super(identifier, dependencyResolver, oldModule, oldInstance);
-    }
-
-    @Override
-    public void validate(){
-        super.validate();
-        // Add custom validation for module attributes here.
-    }
-
-    @Override
-    public java.lang.AutoCloseable createInstance() {
-        HashMapDataStore store = new HashMapDataStore();
-        return store;
-    }
-}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModuleFactory.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/HashMapDataStoreModuleFactory.java
deleted file mode 100644 (file)
index 6b5503f..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-
-/**
-*
-*/
-public class HashMapDataStoreModuleFactory extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractHashMapDataStoreModuleFactory
-{
-
-
-}
index 8ed5206..5fbf127 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.EnumMap;
@@ -102,6 +101,6 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DO
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
+        return coordinator.submit(transaction, cohorts);
     }
 }
index 7cd6afa..0b1dd1c 100644 (file)
@@ -6,11 +6,14 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -27,16 +30,27 @@ import org.slf4j.LoggerFactory;
  * {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type.
  *
  */
-public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
-        implements DOMTransactionChain, DOMDataCommitErrorListener {
+final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+        implements DOMTransactionChain {
+    private static enum State {
+        RUNNING,
+        CLOSING,
+        CLOSED,
+        FAILED,
+    }
 
+    private static final AtomicIntegerFieldUpdater<DOMDataBrokerTransactionChainImpl> COUNTER_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter");
+    private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
     private final AtomicLong txNum = new AtomicLong();
     private final DOMDataCommitExecutor coordinator;
     private final TransactionChainListener listener;
     private final long chainId;
 
-    private volatile boolean failed = false;
+    private volatile State state = State.RUNNING;
+    private volatile int counter = 0;
 
     /**
      *
@@ -62,37 +76,70 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans
         this.listener = Preconditions.checkNotNull(listener);
     }
 
+    private void checkNotFailed() {
+        Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+    }
+
     @Override
     protected Object newTransactionIdentifier() {
         return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
     }
 
     @Override
-    public CheckedFuture<Void,TransactionCommitFailedException> submit(
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(
             final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+        checkNotFailed();
         checkNotClosed();
 
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+        final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+
+        COUNTER_UPDATER.incrementAndGet(this);
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                transactionCompleted();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                transactionFailed(transaction, t);
+            }
+        });
+
+        return ret;
     }
 
     @Override
     public void close() {
-        super.close();
+        final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+        if (!success) {
+            LOG.debug("Chain {} is no longer running", this);
+            return;
+        }
 
+        super.close();
         for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
             subChain.close();
         }
 
-        if (!failed) {
-            LOG.debug("Transaction chain {}¬†successfully finished.", this);
-            // FIXME: this event should be emitted once all operations complete
-            listener.onTransactionChainSuccessful(this);
+        if (counter == 0) {
+            finishClose();
         }
     }
 
-    @Override
-    public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
-        failed = true;
+    private void finishClose() {
+        state = State.CLOSED;
+        listener.onTransactionChainSuccessful(this);
+    }
+
+    private void transactionCompleted() {
+        if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+            finishClose();
+        }
+    }
+
+    private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+        state = State.FAILED;
         LOG.debug("Transaction chain {}¬†failed.", this, cause);
         listener.onTransactionChainFailed(this, tx, cause);
     }
index 77cf105..15d7b1d 100644 (file)
@@ -6,7 +6,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -63,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     @Override
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
-        Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
         ListenableFuture<Void> commitFuture = null;
         try {
             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
-                    listener, commitStatsTracker));
+                    commitStatsTracker));
         } catch(RejectedExecutionException e) {
             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
                       executor, e);
@@ -81,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
         }
 
-        if (listener.isPresent()) {
-            Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
-        }
-
         return MappingCheckedFuture.create(commitFuture,
                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
@@ -141,7 +135,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final Optional<DOMDataCommitErrorListener> listener,
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java
deleted file mode 100644 (file)
index 5ce9241..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-
-/**
- *
- * Utility implemetation of {@link FutureCallback} which is responsible
- * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
- *
- * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
- * callback is invoked with associated transaction and throwable is invoked on listener.
- *
- */
-class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
-
-    private final DOMDataWriteTransaction tx;
-    private final DOMDataCommitErrorListener listener;
-
-
-    /**
-     *
-     * Construct new DOMDataCommitErrorInvoker.
-     *
-     * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
-     * @param listener Listener which should be invoked on error.
-     */
-    public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
-        this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
-        this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-        listener.onCommitFailed(tx, t);
-    }
-
-    @Override
-    public void onSuccess(Void result) {
-        // NOOP
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java
deleted file mode 100644 (file)
index 80bc669..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import java.util.EventListener;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-
-/**
- *
- * Listener on transaction failure which may be passed to
- * {@link DOMDataCommitExecutor}. This listener is notified during transaction
- * processing, before result is delivered to other client code outside MD-SAL.
- * This allows implementors to update their internal state before transaction
- * failure is visible to client code.
- *
- * This is internal API for MD-SAL implementations, for consumer facing error
- * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}.
- *
- */
-interface DOMDataCommitErrorListener extends EventListener {
-
-    /**
-     *
-     * Callback which is invoked on transaction failure during three phase
-     * commit in {@link DOMDataCommitExecutor}.
-     *
-     *
-     * Implementation of this callback MUST NOT do any blocking calls or any
-     * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
-     * Broker coordination thread.
-     *
-     * @param tx
-     *            Transaction which failed
-     * @param cause
-     *            Failure reason
-     */
-    void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
-
-}
index 234758c..8aa97e7 100644 (file)
@@ -7,11 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
+import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * Executor of Three Phase Commit coordination for
@@ -35,15 +34,13 @@ interface DOMDataCommitExecutor {
      *            Transaction to be used as context for reporting
      * @param cohort
      *            DOM Store cohorts representing provided transaction, its
-     *            subtransactoins.
-     * @param listener
-     *            Error listener which should be notified if transaction failed.
+     *            subtransactions.
      * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
      *         nothing is returned from the Future, On failure,
      *         the Future fails with a {@link TransactionCommitFailedException}.
      *
      */
     CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
-            Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+            Iterable<DOMStoreThreePhaseCommitCohort> cohort);
 
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStore.java
deleted file mode 100644 (file)
index 1f82bd7..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.dom.broker.impl;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.sal.core.api.data.DataStore;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class HashMapDataStore implements DataStore, AutoCloseable {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(HashMapDataStore.class);
-
-    private final Map<YangInstanceIdentifier, CompositeNode> configuration = new ConcurrentHashMap<YangInstanceIdentifier, CompositeNode>();
-    private final Map<YangInstanceIdentifier, CompositeNode> operational = new ConcurrentHashMap<YangInstanceIdentifier, CompositeNode>();
-
-    @Override
-    public boolean containsConfigurationPath(final YangInstanceIdentifier path) {
-        return configuration.containsKey(path);
-    }
-
-    @Override
-    public boolean containsOperationalPath(final YangInstanceIdentifier path) {
-        return operational.containsKey(path);
-    }
-
-    @Override
-    public Iterable<YangInstanceIdentifier> getStoredConfigurationPaths() {
-        return configuration.keySet();
-    }
-
-    @Override
-    public Iterable<YangInstanceIdentifier> getStoredOperationalPaths() {
-        return operational.keySet();
-    }
-
-    @Override
-    public CompositeNode readConfigurationData(final YangInstanceIdentifier path) {
-        LOG.trace("Reading configuration path {}", path);
-        return configuration.get(path);
-    }
-
-    @Override
-    public CompositeNode readOperationalData(YangInstanceIdentifier path) {
-        LOG.trace("Reading operational path {}", path);
-        return operational.get(path);
-    }
-
-    @Override
-    public DataCommitHandler.DataCommitTransaction<YangInstanceIdentifier, CompositeNode> requestCommit(
-            final DataModification<YangInstanceIdentifier, CompositeNode> modification) {
-        return new HashMapDataStoreTransaction(modification, this);
-    }
-
-    public RpcResult<Void> rollback(HashMapDataStoreTransaction transaction) {
-        return RpcResultBuilder.<Void> success().build();
-    }
-
-    public RpcResult<Void> finish(HashMapDataStoreTransaction transaction) {
-        final DataModification<YangInstanceIdentifier, CompositeNode> modification = transaction
-                .getModification();
-        for (final YangInstanceIdentifier removal : modification
-                .getRemovedConfigurationData()) {
-            LOG.trace("Removing configuration path {}", removal);
-            remove(configuration, removal);
-        }
-        for (final YangInstanceIdentifier removal : modification
-                .getRemovedOperationalData()) {
-            LOG.trace("Removing operational path {}", removal);
-            remove(operational, removal);
-        }
-        if (LOG.isTraceEnabled()) {
-            for (final YangInstanceIdentifier a : modification
-                    .getUpdatedConfigurationData().keySet()) {
-                LOG.trace("Adding configuration path {}", a);
-            }
-            for (final YangInstanceIdentifier a : modification
-                    .getUpdatedOperationalData().keySet()) {
-                LOG.trace("Adding operational path {}", a);
-            }
-        }
-        configuration.putAll(modification.getUpdatedConfigurationData());
-        operational.putAll(modification.getUpdatedOperationalData());
-
-        return RpcResultBuilder.<Void> success().build();
-    }
-
-    public void remove(final Map<YangInstanceIdentifier, CompositeNode> map,
-            final YangInstanceIdentifier identifier) {
-        Set<YangInstanceIdentifier> affected = new HashSet<YangInstanceIdentifier>();
-        for (final YangInstanceIdentifier path : map.keySet()) {
-            if (identifier.contains(path)) {
-                affected.add(path);
-            }
-        }
-        for (final YangInstanceIdentifier pathToRemove : affected) {
-            LOG.trace("Removed path {}", pathToRemove);
-            map.remove(pathToRemove);
-        }
-    }
-
-    @Override
-    public void close() {
-        // NOOP
-    }
-}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStoreTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/HashMapDataStoreTransaction.java
deleted file mode 100644 (file)
index ee026b6..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class HashMapDataStoreTransaction implements
-        DataCommitTransaction<YangInstanceIdentifier, CompositeNode> {
-    private final DataModification<YangInstanceIdentifier, CompositeNode> modification;
-    private final HashMapDataStore datastore;
-
-    HashMapDataStoreTransaction(
-            final DataModification<YangInstanceIdentifier, CompositeNode> modify,
-            final HashMapDataStore store) {
-        modification = modify;
-        datastore = store;
-    }
-
-    @Override
-    public RpcResult<Void> finish() throws IllegalStateException {
-        return datastore.finish(this);
-    }
-
-    @Override
-    public DataModification<YangInstanceIdentifier, CompositeNode> getModification() {
-        return this.modification;
-    }
-
-    @Override
-    public RpcResult<Void> rollback() throws IllegalStateException {
-        return datastore.rollback(this);
-    }
-}
\ No newline at end of file
index b1df7ef..e81f71a 100644 (file)
@@ -32,12 +32,6 @@ module opendaylight-sal-dom-broker-impl {
         config:provided-service sal:dom-async-data-broker;
     }
     
-    identity hash-map-data-store {
-        base config:module-type;
-        config:provided-service sal:dom-data-store;
-        config:java-name-prefix HashMapDataStore;
-    }
-    
     identity schema-service-singleton {
         base config:module-type;
         config:provided-service sal:schema-service;
@@ -129,12 +123,6 @@ module opendaylight-sal-dom-broker-impl {
         }
     }
     
-    augment "/config:modules/config:module/config:state" {
-        case hash-map-data-store {
-            when "/config:modules/config:module/config:type = 'hash-map-data-store'";
-        }
-    }
-    
     augment "/config:modules/config:module/config:state" {
         case schema-service-singleton {
             when "/config:modules/config:module/config:type = 'schema-service-singleton'";
@@ -149,4 +137,4 @@ module opendaylight-sal-dom-broker-impl {
             } 
         }
     }
-}
\ No newline at end of file
+}
index b421e56..cde0157 100644 (file)
@@ -1,7 +1,5 @@
 package org.opendaylight.xsql;
 
-import java.util.concurrent.ExecutionException;
-
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQL;
@@ -25,16 +23,15 @@ public class XSQLProvider implements AutoCloseable {
             XSQLBuilder builder = new XSQLBuilder();
             builder.setPort("34343");
             XSQL xsql = builder.build();
-            if (dps != null) {
-                final DataModificationTransaction t = dps.beginTransaction();
-                t.removeOperationalData(ID);
-                t.putOperationalData(ID,xsql);
-
-                try {
+            try {
+                if (dps != null) {
+                    final DataModificationTransaction t = dps.beginTransaction();
+                    t.removeOperationalData(ID);
+                    t.putOperationalData(ID,xsql);
                     t.commit().get();
-                } catch (InterruptedException | ExecutionException e) {
-                   LOG.warn("Failed to update toaster status, operational otherwise", e);
                 }
+            } catch (Exception e) {
+                LOG.warn("Failed to update XSQL port status, ", e);
             }
         return xsql;
     }
index c32d960..a1bf2c8 100644 (file)
@@ -1,21 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>\r
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
 \r
-    <!--
-        Licensed to the Apache Software Foundation (ASF) under one or more
-        contributor license agreements.  See the NOTICE file distributed with
-        this work for additional information regarding copyright ownership.
-        The ASF licenses this file to You under the Apache License, Version 2.0
-        (the "License"); you may not use this file except in compliance with
-        the License.  You may obtain a copy of the License at
-
-            http://www.apache.org/licenses/LICENSE-2.0
-
-        Unless required by applicable law or agreed to in writing, software
-        distributed under the License is distributed on an "AS IS" BASIS,
-        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-        See the License for the specific language governing permissions and
-        limitations under the License.
+    <!--\r
+        Licensed to the Apache Software Foundation (ASF) under one or more\r
+        contributor license agreements.  See the NOTICE file distributed with\r
+        this work for additional information regarding copyright ownership.\r
+        The ASF licenses this file to You under the Apache License, Version 2.0\r
+        (the "License"); you may not use this file except in compliance with\r
+        the License.  You may obtain a copy of the License at\r
+\r
+            http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+        Unless required by applicable law or agreed to in writing, software\r
+        distributed under the License is distributed on an "AS IS" BASIS,\r
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+        See the License for the specific language governing permissions and\r
+        limitations under the License.\r
     -->\r
 \r
     <modelVersion>4.0.0</modelVersion>\r
     <version>1.1-SNAPSHOT</version>\r
   </parent>\r
 \r
-    <groupId>xsqlcommand</groupId>\r
-    <artifactId>xsqlcommand</artifactId>\r
+    <groupId>org.opendaylight.controller</groupId>\r
+    <artifactId>sal-karaf-xsql</artifactId>\r
     <packaging>bundle</packaging>\r
-    <version>1.0.0-SNAPSHOT</version>\r
     <name>Apache Karaf :: Shell odl/xsql Commands</name>\r
 \r
     <description>Provides the OSGi odl commands</description>\r
@@ -64,7 +63,6 @@
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-dom-xsql</artifactId>\r
-            <type>bundle</type>\r
             <version>1.1-SNAPSHOT</version>\r
         </dependency>\r
     </dependencies>\r
diff --git a/opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml b/opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml
new file mode 100644 (file)
index 0000000..e9a4a23
--- /dev/null
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0">
+
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
+        <command>
+            <action class="org.opendaylight.controller.xsql.xsql">
+            </action>
+        </command>
+    </command-bundle>
+</blueprint>
index 2e2100b..e0a51a8 100644 (file)
@@ -4,6 +4,11 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
+  <parent>
+     <groupId>org.opendaylight.controller.samples</groupId>
+     <artifactId>sal-samples</artifactId>
+     <version>1.1-SNAPSHOT</version>
+  </parent>
 
   <artifactId>l2switch.aggregator</artifactId>
   <groupId>org.opendaylight.controller.samples.l2switch</groupId>
index 361373d..d8602c2 100644 (file)
@@ -7,17 +7,8 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -48,12 +39,16 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.List;
+
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
@@ -73,11 +68,20 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
         final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
 
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                Optional<Node> nodeOptional = Optional.absent();
+                try {
+                    nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Node ", e);
+                }
+                if (nodeOptional.isPresent()) {
+                    removeAffectedLinks(nodeId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                }
             }
 
             @Override
@@ -119,8 +123,16 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(tpId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+                try {
+                    terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read NodeConnector ", e);
+                }
+                if (terminationPointOptional.isPresent()) {
+                    removeAffectedLinks(tpId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                }
             }
 
             @Override
@@ -164,7 +176,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 final Link link = toTopologyLink(notification);
                 final InstanceIdentifier<Link> path = linkPath(link);
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+                transaction.put(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
 
             @Override
@@ -184,7 +196,17 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(final ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                Optional<Link> linkOptional = Optional.absent();
+                try {
+                    // read that checks if link exists (if we do not do this we might get an exception on delete)
+                    linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+                            linkPath(toTopologyLink(notification))).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Link ", e);
+                }
+                if (linkOptional.isPresent()) {
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                }
             }
 
             @Override
@@ -194,6 +216,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         });
     }
 
+
     @Override
     public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
         // NOOP
@@ -212,89 +235,57 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     }
 
     private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null ?
                 topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceNode()) ||
                     id.equals(link.getDestination().getDestNode())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
-    }
-
-    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
-        if(!linkIDsToDelete.isEmpty()) {
-            processor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(ReadWriteTransaction transaction) {
-                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
-                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "Delete Links " + linkIDsToDelete.size();
-                }
-            });
-        }
     }
 
     private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+ &