Merge "BUG-1845: implement proper shutdown sequence"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 22 Sep 2014 14:13:03 +0000 (14:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 22 Sep 2014 14:13:03 +0000 (14:13 +0000)
59 files changed:
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
opendaylight/archetypes/opendaylight-karaf-distro-archetype/pom.xml
opendaylight/archetypes/opendaylight-karaf-features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/config/config-persister-feature-adapter/src/main/java/org/opendaylight/controller/configpusherfeature/internal/FeatureConfigPusher.java
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/karaf-branding/pom.xml
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/ComponentActivator.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/NodeMappingTest.java
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/xsql/XSQLProvider.java
opendaylight/md-sal/sal-karaf-xsql/pom.xml
opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml [new file with mode: 0644]
opendaylight/md-sal/samples/l2switch/pom.xml
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/xml/JaxBSerializerTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
opendaylight/northbound/networkconfiguration/neutron/pom.xml
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronLoadBalancerPoolNorthbound.java

index f45f680..299e5b6 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-dom-xsql</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-karaf-xsql</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-dom-xsql-config</artifactId>
index 0e24176..2a988ce 100644 (file)
@@ -65,6 +65,7 @@
     <feature name ='odl-mdsal-xsql' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <bundle>mvn:org.opendaylight.controller/sal-dom-xsql/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/sal-karaf-xsql/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/${config.xsql.configfile}">mvn:org.opendaylight.controller/sal-dom-xsql-config/${project.version}/xml/config</configfile>
     </feature>
     <feature name ='odl-mdsal-apidocs' version='${project.version}'>
index 8883c64..9081ce7 100644 (file)
@@ -1,7 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-
+  <parent>
+     <groupId>org.opendaylight.controller.archetypes</groupId>
+     <artifactId>archetypes-parent</artifactId>
+     <version>0.1.1-SNAPSHOT</version>
+  </parent>
   <groupId>org.opendaylight.controller</groupId>
   <artifactId>opendaylight-karaf-distro-archetype</artifactId>
   <version>1.0.0-SNAPSHOT</version>
index 4973a69..264402a 100644 (file)
@@ -2,6 +2,11 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
+  <parent>
+     <groupId>org.opendaylight.controller.archetypes</groupId>
+     <artifactId>archetypes-parent</artifactId>
+     <version>0.1.1-SNAPSHOT</version>
+  </parent>
   <groupId>org.opendaylight.controller</groupId>
   <artifactId>opendaylight-karaf-features-archetype</artifactId>
   <version>1.0.0-SNAPSHOT</version>
index e3ffbf3..a98afc8 100644 (file)
         <artifactId>sal-dom-xsql</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-karaf-xsql</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-dom-xsql-config</artifactId>
index 1c094ad..57052f9 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.collect.LinkedHashMultimap;
  */
 public class FeatureConfigPusher {
     private static final Logger logger = LoggerFactory.getLogger(FeatureConfigPusher.class);
+    private static final int MAX_RETRIES=100;
     private FeaturesService featuresService = null;
     private ConfigPusher pusher = null;
     /*
@@ -82,7 +83,29 @@ public class FeatureConfigPusher {
     }
 
     private boolean isInstalled(Feature feature) {
-        List<Feature> installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+        List<Feature> installedFeatures= null;
+        boolean cont = true;
+        int retries = 0;
+        while(cont) {
+            try {
+                installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+                break;
+            } catch (Exception e) {
+                if(retries < MAX_RETRIES) {
+                    logger.warn("Karaf featuresService.listInstalledFeatures() has thrown an exception, retry {}, Exception {}", retries,e);
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e1) {
+                        throw new IllegalStateException(e1);
+                    }
+                    retries++;
+                    continue;
+                } else {
+                    logger.error("Giving up on Karaf featuresService.listInstalledFeatures() which has thrown an exception, retry {}, Exception {}", retries,e);
+                    throw e;
+                }
+            }
+        }
         return installedFeatures.contains(feature);
     }
 
index cdb6542..4a8f5ae 100644 (file)
@@ -94,6 +94,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 530e46e..691d83d 100644 (file)
@@ -116,6 +116,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 727f224..444e770 100644 (file)
@@ -2,7 +2,12 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <modelVersion>4.0.0</modelVersion>
-
+    <parent>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>releasepom</artifactId>
+      <version>0.1.2-SNAPSHOT</version>
+      <relativePath>../..</relativePath>
+    </parent>
     <groupId>org.opendaylight.controller</groupId>
     <artifactId>karaf.branding</artifactId>
     <version>1.0.0-SNAPSHOT</version>
index d71858e..4fc0cf7 100644 (file)
@@ -139,6 +139,7 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
     protected Object[] getImplementations() {
         return new Object[] {
                 dataPacketService,
+                inventory,
         };
     }
 
@@ -148,6 +149,8 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
             _instanceConfigure((ComponentActivator)imp, c, containerName);
         } else if (imp instanceof DataPacketServiceAdapter) {
             _instanceConfigure((DataPacketServiceAdapter)imp, c, containerName);
+        } else if (imp instanceof InventoryAndReadAdapter) {
+            _instanceConfigure((InventoryAndReadAdapter)imp, c, containerName);
         } else {
             throw new IllegalArgumentException(String.format("Unhandled implementation class %s", imp.getClass()));
         }
@@ -215,6 +218,22 @@ public class ComponentActivator extends ComponentActivatorAbstractBase {
                 .setRequired(false));
     }
 
+    private void _instanceConfigure(final InventoryAndReadAdapter imp, final Component it, String containerName) {
+        it.setInterface(new String[] {
+                IPluginInInventoryService.class.getName(),
+                IPluginInReadService.class.getName(),
+        }, properties());
+
+        it.add(createServiceDependency()
+                .setService(IPluginOutReadService.class)
+                .setCallbacks("setReadPublisher", "unsetReadPublisher")
+                .setRequired(false));
+        it.add(createServiceDependency()
+                .setService(IPluginOutInventoryService.class)
+                .setCallbacks("setInventoryPublisher", "unsetInventoryPublisher")
+                .setRequired(false));
+    }
+
     private void _configure(final TopologyAdapter imp, final Component it) {
         it.setInterface(IPluginInTopologyService.class.getName(), properties());
 
index e2c1386..1530e90 100644 (file)
@@ -48,7 +48,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -241,20 +240,20 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
      * @param id Table id
      * @return Table contents, or null if not present
      */
-    private Table readConfigTable(final Node node, final short id) {
+    private Table readOperationalTable(final Node node, final short id) {
         final InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class)
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node))
                 .augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey(id))
                 .build();
 
-        return (Table) startChange().readConfigurationData(tableRef);
+        return (Table) startChange().readOperationalData(tableRef);
     }
 
     @Override
     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
         final ArrayList<FlowOnNode> output = new ArrayList<>();
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
         if (table != null) {
             final List<Flow> flows = table.getFlow();
             LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@@ -268,12 +267,6 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
             }
         }
 
-        // TODO (main): Shall we send request to the switch? It will make async request to the switch.
-        // Once the plugin receives a response, it will let the adaptor know through onFlowStatisticsUpdate()
-        // If we assume that md-sal statistics manager will always be running, then it is not required
-        // But if not, then sending request will collect the latest data for adaptor at least.
-        getFlowStatisticsService().getAllFlowsStatisticsFromAllFlowTables(
-                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder().setNode(NodeMapping.toNodeRef(node)).build());
         return output;
     }
 
@@ -334,7 +327,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) {
         FlowOnNode ret = null;
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
         if (table != null) {
             final List<Flow> flows = table.getFlow();
             InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@@ -386,7 +379,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) {
         NodeTableStatistics nodeStats = null;
-        final Table table = readConfigTable(nodeTable.getNode(), (short) nodeTable.getID());
+        final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID());
         if (table != null) {
             final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
             if (tableStats != null) {
index b873f8a..bcb2367 100644 (file)
@@ -168,7 +168,22 @@ public final class NodeMapping {
      * @return
      */
     private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
-        return new NodeId(aDNode.getType() + ":" + String.valueOf(aDNode.getID()));
+        String targetPrefix = null;
+        if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
+                targetPrefix = OPENFLOW_ID_PREFIX;
+        } else {
+            targetPrefix = aDNode.getType() + ":";
+        }
+
+        return new NodeId(targetPrefix + String.valueOf(aDNode.getID()));
+    }
+
+    /**
+     * @param aDNode
+     * @return md-sal {@link NodeKey}
+     */
+    public static NodeKey toNodeKey(org.opendaylight.controller.sal.core.Node aDNode) {
+        return new NodeKey(toNodeId(aDNode));
     }
 
     public static String toNodeConnectorType(final NodeConnectorId ncId, final NodeId nodeId) {
index a776ef2..759e69f 100644 (file)
@@ -196,6 +196,19 @@ public class NodeMappingTest {
         Assert.assertEquals(0xCC4E241C4A000000L, NodeMapping.openflowFullNodeIdToLong("14721743935839928320").longValue());
     }
 
+    /**
+     * Test method for
+     * {@link org.opendaylight.controller.sal.compatibility.NodeMapping#toNodeKey(org.opendaylight.controller.sal.core.Node)}
+     * .
+     * @throws ConstructionException
+     */
+    @Test
+    public void testToNodeKey() throws ConstructionException {
+        org.opendaylight.controller.sal.core.Node aDNode = new org.opendaylight.controller.sal.core.Node(NodeIDType.OPENFLOW, 42L);
+        NodeKey nodeKey = NodeMapping.toNodeKey(aDNode);
+        Assert.assertEquals("openflow:42", nodeKey.getId().getValue());
+    }
+
     /**
      * @param nodeId
      * @param portId
index 9641859..698dbcb 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.frm.impl;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.frm.ForwardingRulesManager;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -33,6 +32,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * GroupForwarder
  * It implements {@link org.opendaylight.controller.md.sal.binding.api.DataChangeListener}}
@@ -52,8 +53,27 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
     public FlowForwarder (final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, Flow.class);
         Preconditions.checkNotNull(db, "DataBroker can not be null!");
-        this.listenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
-                getWildCardPath(), FlowForwarder.this, DataChangeScope.SUBTREE);
+        registrationListener(db, 5);
+    }
+
+    private void registrationListener(final DataBroker db, int i) {
+        try {
+            listenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+                    getWildCardPath(), FlowForwarder.this, DataChangeScope.SUBTREE);
+        } catch (final Exception e) {
+            if (i >= 1) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    LOG.error("Thread interrupted '{}'", e1);
+                    Thread.currentThread().interrupt();
+                }
+                registrationListener(db, --i);
+            } else {
+                LOG.error("FRM Flow DataChange listener registration fail!", e);
+                throw new IllegalStateException("FlowForwarder registration Listener fail! System needs restart.", e);
+            }
+        }
     }
 
     @Override
@@ -61,7 +81,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (listenerRegistration != null) {
             try {
                 listenerRegistration.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 LOG.error("Error by stop FRM FlowChangeListener.", e);
             }
             listenerRegistration = null;
@@ -80,7 +100,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            this.provider.getSalFlowService().removeFlow(builder.build());
+            provider.getSalFlowService().removeFlow(builder.build());
         }
     }
 
@@ -99,7 +119,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build());
             builder.setOriginalFlow((new OriginalFlowBuilder(original)).build());
 
-            this.provider.getSalFlowService().updateFlow(builder.build());
+            provider.getSalFlowService().updateFlow(builder.build());
         }
     }
 
@@ -116,7 +136,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setFlowRef(new FlowRef(identifier));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            this.provider.getSalFlowService().addFlow(builder.build());
+            provider.getSalFlowService().addFlow(builder.build());
         }
     }
 
index 71a0de9..a9c81b9 100644 (file)
@@ -77,6 +77,7 @@
 
     <!-- XSQL -->
     <module>sal-dom-xsql</module>
+    <module>sal-karaf-xsql</module>
     <module>sal-dom-xsql-config</module>
 
     <!-- Yang Test Models for MD-SAL -->
index 3bfdf73..04df778 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -165,4 +167,24 @@ public class ExampleActor extends RaftActor {
     @Override public String persistenceId() {
         return getId();
     }
+
+    @Override
+    protected void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+    }
 }
index b436bce..2be4a0c 100644 (file)
@@ -18,13 +18,14 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected List<ReplicatedLogEntry> journal;
+    // We define this as ArrayList so we can use ensureCapacity.
+    protected ArrayList<ReplicatedLogEntry> journal;
     protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected ByteString previousSnapshot;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
@@ -106,6 +107,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         journal.add(replicatedLogEntry);
     }
 
+    @Override
+    public void increaseJournalLogCapacity(int amount) {
+        journal.ensureCapacity(journal.size() + amount);
+    }
+
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
         return getFrom(logEntryIndex, journal.size());
@@ -208,7 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public void snapshotCommit() {
-        snapshottedJournal.clear();
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
@@ -218,7 +223,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     @Override
     public void snapshotRollback() {
         snapshottedJournal.addAll(journal);
-        journal.clear();
         journal = snapshottedJournal;
         snapshottedJournal = null;
 
index ed6439d..bff2a27 100644 (file)
@@ -26,7 +26,7 @@ public interface ConfigParams {
      *
      * @return long
      */
-    public long getSnapshotBatchCount();
+    long getSnapshotBatchCount();
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -34,7 +34,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getHeartBeatInterval();
+    FiniteDuration getHeartBeatInterval();
 
     /**
      * The interval in which a new election would get triggered if no leader is found
@@ -43,7 +43,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getElectionTimeOutInterval();
+    FiniteDuration getElectionTimeOutInterval();
 
     /**
      * The maximum election time variance. The election is scheduled using both
@@ -51,10 +51,15 @@ public interface ConfigParams {
      *
      * @return int
      */
-    public int getElectionTimeVariance();
+    int getElectionTimeVariance();
 
     /**
      * The size (in bytes) of the snapshot chunk sent from Leader
      */
-    public int getSnapshotChunkSize();
+    int getSnapshotChunkSize();
+
+    /**
+     * The number of journal log entries to batch on recovery before applying.
+     */
+    int getJournalRecoveryLogBatchSize();
 }
index 9d06f63..dc41453 100644 (file)
@@ -20,12 +20,14 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
+    private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
+
     /**
      * The maximum election time variance
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
-    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+    private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
 
 
     /**
@@ -39,17 +41,32 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 
+    private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
+    private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
+    private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+
+    public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+
+    public void setSnapshotBatchCount(long snapshotBatchCount) {
+        this.snapshotBatchCount = snapshotBatchCount;
+    }
+
+    public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+        this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
+    }
+
     @Override
     public long getSnapshotBatchCount() {
-        return SNAPSHOT_BATCH_COUNT;
+        return snapshotBatchCount;
     }
 
     @Override
     public FiniteDuration getHeartBeatInterval() {
-        return HEART_BEAT_INTERVAL;
+        return heartBeatInterval;
     }
 
-
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
         // returns 2 times the heart beat interval
@@ -65,4 +82,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getSnapshotChunkSize() {
         return SNAPSHOT_CHUNK_SIZE;
     }
+
+    @Override
+    public int getJournalRecoveryLogBatchSize() {
+        return journalRecoveryLogBatchSize;
+    }
 }
index 6e1a13c..64fa749 100644 (file)
@@ -20,6 +20,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -39,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.Map;
 
@@ -97,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    protected RaftActorContext context;
+    private final RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -108,6 +108,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     private volatile boolean hasSnapshotCaptureInitiated = false;
 
+    private Stopwatch recoveryTimer;
+
+    private int currentRecoveryBatchCount;
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -122,65 +126,132 @@ public abstract class RaftActor extends UntypedPersistentActor {
             LOG);
     }
 
-    @Override public void onReceiveRecover(Object message) {
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = new Stopwatch();
+            recoveryTimer.start();
+        }
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
+                context.getConfigParams().getJournalRecoveryLogBatchSize());
+        super.preStart();
+    }
+
+    @Override
+    public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.info("SnapshotOffer called..");
-            SnapshotOffer offer = (SnapshotOffer) message;
-            Snapshot snapshot = (Snapshot) offer.snapshot();
+            onRecoveredSnapshot((SnapshotOffer)message);
+        } else if (message instanceof ReplicatedLogEntry) {
+            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
+        } else if (message instanceof ApplyLogEntries) {
+            onRecoveredApplyLogEntries((ApplyLogEntries)message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
+        } else if (message instanceof RecoveryCompleted) {
+            onRecoveryCompletedMessage();
+        }
+    }
 
-            // Create a replicated log with the snapshot information
-            // The replicated log can be used later on to retrieve this snapshot
-            // when we need to install it on a peer
-            replicatedLog = new ReplicatedLogImpl(snapshot);
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        LOG.debug("SnapshotOffer called..");
 
-            context.setReplicatedLog(replicatedLog);
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-            context.setCommitIndex(snapshot.getLastAppliedIndex());
+        initRecoveryTimer();
 
-            LOG.info("Applied snapshot to replicatedLog. " +
-                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size()
-            );
+        Snapshot snapshot = (Snapshot) offer.snapshot();
 
-            // Apply the snapshot to the actors state
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        } else if (message instanceof ReplicatedLogEntry) {
-            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-            LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
-            replicatedLog.append(logEntry);
+        context.setReplicatedLog(replicatedLog);
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
 
-        } else if (message instanceof ApplyLogEntries) {
-            ApplyLogEntries ale = (ApplyLogEntries) message;
+        Stopwatch timer = new Stopwatch();
+        timer.start();
 
-            LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
-                context.getLastApplied() + 1, ale.getToIndex());
+        // Apply the snapshot to the actors state
+        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
 
-            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
-                applyState(null, "recovery", replicatedLog.get(i).getData());
-            }
-            context.setLastApplied(ale.getToIndex());
-            context.setCommitIndex(ale.getToIndex());
+        timer.stop();
+        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+                replicatedLog.size(), persistenceId(), timer.toString(),
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+    }
 
-        } else if (message instanceof DeleteEntries) {
-            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+        }
 
-        } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                ((UpdateElectionTerm) message).getVotedFor());
+        replicatedLog.append(logEntry);
+    }
 
-        } else if (message instanceof RecoveryCompleted) {
-            LOG.info(
-                "RecoveryCompleted - Switching actor to Follower - " +
-                    "Persistence Id =  " + persistenceId() +
-                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                    "journal-size={}",
-                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                replicatedLog.snapshotTerm, replicatedLog.size());
-            currentBehavior = switchBehavior(RaftState.Follower);
-            onStateChanged();
+    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    context.getLastApplied() + 1, ale.getToIndex());
         }
+
+        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+            batchRecoveredLogEntry(replicatedLog.get(i));
+        }
+
+        context.setLastApplied(ale.getToIndex());
+        context.setCommitIndex(ale.getToIndex());
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            startLogRecoveryBatch(batchSize);
+        }
+
+        appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
+        }
+    }
+
+    private void endCurrentLogRecoveryBatch() {
+        applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        onRecoveryComplete();
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        LOG.info(
+            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                "Persistence Id =  " + persistenceId() +
+                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+                "journal-size={}",
+            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+            replicatedLog.snapshotTerm, replicatedLog.size());
+
+        currentBehavior = switchBehavior(RaftState.Follower);
+        onStateChanged();
     }
 
     @Override public void onReceiveCommand(Object message) {
@@ -198,7 +269,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
-            LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            }
             persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
                 public void apply(ApplyLogEntries param) throws Exception {
@@ -391,6 +464,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return context.getLastApplied();
     }
 
+    protected RaftActorContext getRaftActorContext() {
+        return context;
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -431,6 +508,38 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void applyState(ActorRef clientActor, String identifier,
         Object data);
 
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    protected abstract void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     *
+     * @param data the state data
+     */
+    protected abstract void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshot A snapshot of the state of the actor
+     */
+    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     */
+    protected abstract void applyCurrentLogRecoveryBatch();
+
+    /**
+     * This method is called when recovery is complete.
+     */
+    protected abstract void onRecoveryComplete();
+
     /**
      * This method will be called by the RaftActor when a snapshot needs to be
      * created. The derived actor should respond with its current state.
@@ -443,10 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void createSnapshot();
 
     /**
-     * This method will be called by the RaftActor during recovery to
-     * reconstruct the state of the actor.
-     * <p/>
-     * This method may also be called at any other point during normal
+     * This method can be called at any other point during normal
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
@@ -603,6 +709,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // of a single command.
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
+                    @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
@@ -667,10 +774,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         private long currentTerm = 0;
         private String votedFor = null;
 
+        @Override
         public long getCurrentTerm() {
             return currentTerm;
         }
 
+        @Override
         public String getVotedFor() {
             return votedFor;
         }
index 25da371..e4aef0a 100644 (file)
@@ -59,26 +59,32 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
     }
 
+    @Override
     public ActorSelection actorSelection(String path){
         return context.actorSelection(path);
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
+    @Override
     public ActorRef getActor() {
         return actor;
     }
 
+    @Override
     public ElectionTerm getTermInformation() {
         return termInformation;
     }
 
+    @Override
     public long getCommitIndex() {
         return commitIndex;
     }
@@ -87,6 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.commitIndex = commitIndex;
     }
 
+    @Override
     public long getLastApplied() {
         return lastApplied;
     }
index c17f544..8589333 100644 (file)
@@ -74,6 +74,13 @@ public interface ReplicatedLog {
      */
     void append(ReplicatedLogEntry replicatedLogEntry);
 
+    /**
+     * Optimization method to increase the capacity of the journal log prior to appending entries.
+     *
+     * @param amount the amount to increase by
+     */
+    void increaseJournalLogCapacity(int amount);
+
     /**
      *
      * @param replicatedLogEntry
index 199d2d6..ff8a225 100644 (file)
@@ -88,16 +88,12 @@ public class Leader extends AbstractRaftActorBehavior {
 
         LOG = context.getLogger();
 
-        if (lastIndex() >= 0) {
-            context.setCommitIndex(lastIndex());
-        }
-
         followers = context.getPeerAddresses().keySet();
 
         for (String followerId : followers) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(lastIndex()),
+                    new AtomicLong(context.getCommitIndex()),
                     new AtomicLong(-1));
 
             followerToLog.put(followerId, followerLogInformation);
index ca34a34..0d5f644 100644 (file)
@@ -200,6 +200,10 @@ public class MockRaftActorContext implements RaftActorContext {
     public static class MockPayload extends Payload implements Serializable {
         private String value = "";
 
+        public MockPayload(){
+
+        }
+
         public MockPayload(String s) {
             this.value = s;
         }
@@ -251,4 +255,24 @@ public class MockRaftActorContext implements RaftActorContext {
             return index;
         }
     }
+
+    public static class MockReplicatedLogBuilder {
+        private ReplicatedLog mockLog = new SimpleReplicatedLog();
+
+        public  MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+            for (int i=start; i<end; i++) {
+                this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
+            }
+            return this;
+        }
+
+        public  MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+            this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+            return this;
+        }
+
+        public ReplicatedLog build() {
+            return this.mockLog;
+        }
+    }
 }
index 998c198..22f3743 100644 (file)
@@ -4,19 +4,22 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,9 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -42,58 +45,90 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
-        private boolean applySnapshotCalled = false;
-        private List<Object> state;
+        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+            private final Map<String, String> peerAddresses;
+            private final String id;
+            private final Optional<ConfigParams> config;
 
-        public MockRaftActor(String id,
-            Map<String, String> peerAddresses) {
-            super(id, peerAddresses);
-            state = new ArrayList<>();
+            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+                    Optional<ConfigParams> config) {
+                this.peerAddresses = peerAddresses;
+                this.id = id;
+                this.config = config;
+            }
+
+            @Override
+            public MockRaftActor create() throws Exception {
+                return new MockRaftActor(id, peerAddresses, config);
+            }
         }
 
-        public RaftActorContext getRaftActorContext() {
-            return context;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+            super(id, peerAddresses, config);
+            state = new ArrayList<>();
         }
 
-        public boolean isApplySnapshotCalled() {
-            return applySnapshotCalled;
+        public void waitForRecoveryComplete() {
+            try {
+                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }
 
         public List<Object> getState() {
             return state;
         }
 
-        public static Props props(final String id, final Map<String, String> peerAddresses){
-            return Props.create(new Creator<MockRaftActor>(){
-
-                @Override public MockRaftActor create() throws Exception {
-                    return new MockRaftActor(id, peerAddresses);
-                }
-            });
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                Optional<ConfigParams> config){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
         }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        }
+
+        @Override
+        protected void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        protected void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
-        @Override protected void createSnapshot() {
-            throw new UnsupportedOperationException("createSnapshot");
+        @Override
+        protected void applyCurrentLogRecoveryBatch() {
         }
 
-        @Override protected void applySnapshot(ByteString snapshot) {
-            applySnapshotCalled = true;
+        @Override
+        protected void onRecoveryComplete() {
+            recoveryComplete.countDown();
+        }
+
+        @Override
+        protected void applyRecoverySnapshot(ByteString snapshot) {
             try {
                 Object data = toObject(snapshot);
+                System.out.println("!!!!!applyRecoverySnapshot: "+data);
                 if (data instanceof List) {
                     state.addAll((List) data);
                 }
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 e.printStackTrace();
             }
         }
 
+        @Override protected void createSnapshot() {
+            throw new UnsupportedOperationException("createSnapshot");
+        }
+
+        @Override protected void applySnapshot(ByteString snapshot) {
+        }
+
         @Override protected void onStateChanged() {
         }
 
@@ -130,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest {
         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
             super(actorSystem);
 
-            raftActor = this.getSystem()
-                .actorOf(MockRaftActor.props(actorName,
-                    Collections.EMPTY_MAP), actorName);
+            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
 
         }
 
@@ -142,6 +176,7 @@ public class RaftActorTest extends AbstractActorTest {
             return
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -153,37 +188,15 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         public void findLeader(final String expectedLeader){
+            raftActor.tell(new FindLeader(), getRef());
 
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    raftActor.tell(new FindLeader(), getRef());
-
-                    String s = new ExpectMsg<String>(duration("1 seconds"),
-                        "findLeader") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof FindLeaderReply) {
-                                return ((FindLeaderReply) in).getLeaderActor();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();// this extracts the received message
-
-                    assertEquals(expectedLeader, s);
-
-                }
-
-
-            };
+            FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
         }
 
         public ActorRef getRaftActor() {
             return raftActor;
         }
-
     }
 
 
@@ -201,89 +214,84 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() {
+    public void testRaftActorRecovery() throws Exception {
         new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    String persistenceId = "follower10";
-
-                    ActorRef followerActor = getSystem().actorOf(
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
-                    List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
-                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
-                    snapshotUnappliedEntries.add(entry1);
-
-                    int lastAppliedDuringSnapshotCapture = 3;
-                    int lastIndexDuringSnapshotCapture = 4;
-
-                    ByteString snapshotBytes = null;
-                    try {
-                        // 4 messages as part of snapshot, which are applied to state
-                        snapshotBytes  = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
-                            new MockRaftActorContext.MockPayload("B"),
-                            new MockRaftActorContext.MockPayload("C"),
-                            new MockRaftActorContext.MockPayload("D")));
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                    Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
-                        lastAppliedDuringSnapshotCapture, 1);
-                    MockSnapshotStore.setMockSnapshot(snapshot);
-                    MockSnapshotStore.setPersistenceId(persistenceId);
-
-                    // add more entries after snapshot is taken
-                    List<ReplicatedLogEntry> entries = new ArrayList<>();
-                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
-                    ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
-                    ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
-                    entries.add(entry2);
-                    entries.add(entry3);
-                    entries.add(entry4);
-
-                    int lastAppliedToState = 5;
-                    int lastIndex = 7;
-
-                    MockAkkaJournal.addToJournal(5, entry2);
-                    // 2 entries are applied to state besides the 4 entries in snapshot
-                    MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
-                    MockAkkaJournal.addToJournal(7, entry3);
-                    MockAkkaJournal.addToJournal(8, entry4);
-
-                    // kill the actor
-                    followerActor.tell(PoisonPill.getInstance(), null);
-
-                    try {
-                        // give some time for actor to die
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    //reinstate the actor
-                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
-                    try {
-                        //give some time for snapshot offer to get called.
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
-                    assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
-                    assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
-                    assertEquals(lastAppliedToState, context.getLastApplied());
-                    assertEquals(lastAppliedToState, context.getCommitIndex());
-                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
-                    assertEquals(6, ref.underlyingActor().getState().size());
-                }
-            };
+            String persistenceId = "follower10";
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            // Set the heartbeat interval high to essentially disable election otherwise the test
+            // may fail if the actor is switched to Leader and the commitIndex is set to the last
+            // log entry.
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+            watch(followerActor);
+
+            List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                    new MockRaftActorContext.MockPayload("E"));
+            snapshotUnappliedEntries.add(entry1);
+
+            int lastAppliedDuringSnapshotCapture = 3;
+            int lastIndexDuringSnapshotCapture = 4;
+
+                // 4 messages as part of snapshot, which are applied to state
+            ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+            Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                    snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+                    lastAppliedDuringSnapshotCapture, 1);
+            MockSnapshotStore.setMockSnapshot(snapshot);
+            MockSnapshotStore.setPersistenceId(persistenceId);
+
+            // add more entries after snapshot is taken
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                    new MockRaftActorContext.MockPayload("F"));
+            ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                    new MockRaftActorContext.MockPayload("G"));
+            ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                    new MockRaftActorContext.MockPayload("H"));
+            entries.add(entry2);
+            entries.add(entry3);
+            entries.add(entry4);
+
+            int lastAppliedToState = 5;
+            int lastIndex = 7;
+
+            MockAkkaJournal.addToJournal(5, entry2);
+            // 2 entries are applied to state besides the 4 entries in snapshot
+            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(7, entry3);
+            MockAkkaJournal.addToJournal(8, entry4);
+
+            // kill the actor
+            followerActor.tell(PoisonPill.getInstance(), null);
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            unwatch(followerActor);
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+                    MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+                            Optional.<ConfigParams>of(config)));
+
+            ref.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+                    context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
         }};
-
     }
 
     private ByteString fromObject(Object snapshot) throws Exception {
index fd4a75a..a72a7c4 100644 (file)
@@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
 import com.google.protobuf.ByteString;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -22,10 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,9 +30,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
-import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -527,15 +520,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
-        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
-        Timeout operationTimeout = new Timeout(operationDuration);
-        Future<Object> future = ask(actor, message, operationTimeout);
-
-        try {
-            return Await.result(future, operationDuration);
-        } catch (Exception e) {
-            throw e;
-        }
+        return MessageCollectorActor.getAllMessages(actor);
     }
 
     public ByteString getNextChunk (ByteString bs, int offset){
index c4ef51d..19af647 100644 (file)
@@ -20,9 +20,12 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 import java.io.ByteArrayOutputStream;
@@ -171,18 +174,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     actorContext.getReplicatedLog().removeFrom(0);
 
-                    actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
-                        new MockRaftActorContext.MockPayload("foo")));
-
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(1, 1,
-                            new MockRaftActorContext.MockPayload("foo"));
-
-                    actorContext.getReplicatedLog().append(entry);
+                    actorContext.setReplicatedLog(
+                        new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
+                            .build());
 
                     Leader leader = new Leader(actorContext);
                     RaftState raftState = leader
-                        .handleMessage(senderActor, new Replicate(null, "state-id",entry));
+                        .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
 
                     // State should not change
                     assertEquals(RaftState.Leader, raftState);
@@ -335,7 +333,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                             new MockRaftActorContext.MockPayload("D"));
 
-
                     RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
 
                     assertEquals(RaftState.Leader, raftState);
@@ -526,6 +523,157 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return null;
     }
 
+    public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+        private static AbstractRaftActorBehavior behavior;
+
+        public ForwardMessageToBehaviorActor(){
+
+        }
+
+        @Override public void onReceive(Object message) throws Exception {
+            super.onReceive(message);
+            behavior.handleMessage(sender(), message);
+        }
+
+        public static void setBehavior(AbstractRaftActorBehavior behavior){
+            ForwardMessageToBehaviorActor.behavior = behavior;
+        }
+    }
+
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            // follower too has the exact same log entries and has the same commit index
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            followerActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
+
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(
+                Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            // follower has the same log entries but its commit index > leaders commit index
+            followerActorContext.setCommitIndex(2);
+
+            Leader leader = new Leader(leaderActorContext);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
     private static class LeaderTestKit extends JavaTestKit {
 
         private LeaderTestKit(ActorSystem actorSystem) {
index 88eecfe..5892845 100644 (file)
@@ -8,10 +8,18 @@
 
 package org.opendaylight.controller.cluster.raft.utils;
 
+import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 
 public class MessageCollectorActor extends UntypedActor {
@@ -26,4 +34,35 @@ public class MessageCollectorActor extends UntypedActor {
             messages.add(message);
         }
     }
+
+    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = Patterns.ask(actor, "get-all-messages", operationTimeout);
+
+        try {
+            return (List<Object>) Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * Get the first message that matches the specified class
+     * @param actor
+     * @param clazz
+     * @return
+     */
+    public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return message;
+            }
+        }
+
+        return null;
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
new file mode 100644 (file)
index 0000000..36b2866
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.persistence.UntypedPersistentActor;
+
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    public AbstractUntypedPersistentActor() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Actor created {}", getSelf());
+        }
+        getContext().
+            system().
+            actorSelection("user/termination-monitor").
+            tell(new Monitor(getSelf()), getSelf());
+
+    }
+
+
+    @Override public void onReceiveCommand(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received message {}", messageType);
+        }
+        handleCommand(message);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Done handling message {}", messageType);
+        }
+
+    }
+
+    @Override public void onReceiveRecover(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received message {}", messageType);
+        }
+        handleRecover(message);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Done handling message {}", messageType);
+        }
+
+    }
+
+    protected abstract void handleRecover(Object message) throws Exception;
+
+    protected abstract void handleCommand(Object message) throws Exception;
+
+    protected void ignoreMessage(Object message) {
+        LOG.debug("Unhandled message {} ", message);
+    }
+
+    protected void unknownMessage(Object message) throws Exception {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received unhandled message {}", message);
+        }
+        unhandled(message);
+    }
+}
index 3e1bd35..44da4a5 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.data.api.Node;
@@ -36,6 +37,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNo
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -232,7 +234,7 @@ public class NormalizedNodeSerializer {
 
     private static class DeSerializer implements NormalizedNodeDeSerializationContext {
         private static Map<NormalizedNodeType, DeSerializationFunction>
-            deSerializationFunctions = new HashMap<>();
+            deSerializationFunctions = new EnumMap<>(NormalizedNodeType.class);
 
         static {
             deSerializationFunctions.put(CONTAINER_NODE_TYPE,
@@ -447,8 +449,9 @@ public class NormalizedNodeSerializer {
 
         private NormalizedNode deSerialize(NormalizedNodeMessages.Node node){
             Preconditions.checkNotNull(node, "node should not be null");
-            DeSerializationFunction deSerializationFunction =
-                Preconditions.checkNotNull(deSerializationFunctions.get(NormalizedNodeType.values()[node.getIntType()]), "Unknown type " + node);
+
+            DeSerializationFunction deSerializationFunction = deSerializationFunctions.get(
+                    NormalizedNodeType.values()[node.getIntType()]);
 
             return deSerializationFunction.apply(this, node);
         }
@@ -544,8 +547,4 @@ public class NormalizedNodeSerializer {
             NormalizedNode apply(DeSerializer deserializer, NormalizedNodeMessages.Node node);
         }
     }
-
-
-
-
 }
index d7627c0..4fb676e 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -26,6 +27,7 @@ import java.util.Set;
 import static org.opendaylight.controller.cluster.datastore.node.utils.serialization.PathArgumentType.getSerializablePathArgumentType;
 
 public class PathArgumentSerializer {
+    private static final String REVISION_ARG = "?revision=";
     private static final Map<Class, PathArgumentAttributesGetter> pathArgumentAttributesGetters = new HashMap<>();
 
     public static NormalizedNodeMessages.PathArgument serialize(NormalizedNodeSerializationContext context, YangInstanceIdentifier.PathArgument pathArgument){
@@ -190,27 +192,24 @@ public class PathArgumentSerializer {
         // If this serializer is used qName cannot be null (see encodeQName)
         // adding null check only in case someone tried to deSerialize a protocol buffer node
         // that was not serialized using the PathArgumentSerializer
-        Preconditions.checkNotNull(qName, "qName should not be null");
-        Preconditions.checkArgument(!"".equals(qName.getLocalName()),
-            "qName.localName cannot be empty qName = " + qName.toString());
-        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
+//        Preconditions.checkNotNull(qName, "qName should not be null");
+//        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
 
-        StringBuilder sb = new StringBuilder();
         String namespace = context.getNamespace(qName.getNamespace());
-        String revision = "";
         String localName = context.getLocalName(qName.getLocalName());
+        StringBuilder sb;
         if(qName.getRevision() != -1){
-            revision = context.getRevision(qName.getRevision());
-            sb.append("(").append(namespace).append("?revision=").append(
-                revision).append(")").append(
-                localName);
+            String revision = context.getRevision(qName.getRevision());
+            sb = new StringBuilder(namespace.length() + REVISION_ARG.length() + revision.length() +
+                    localName.length() + 2);
+            sb.append('(').append(namespace).append(REVISION_ARG).append(
+                revision).append(')').append(localName);
         } else {
-            sb.append("(").append(namespace).append(")").append(
-                localName);
+            sb = new StringBuilder(namespace.length() + localName.length() + 2);
+            sb.append('(').append(namespace).append(')').append(localName);
         }
 
         return sb.toString();
-
     }
 
     /**
@@ -223,10 +222,6 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         NormalizedNodeMessages.PathArgument pathArgument) {
 
-        Preconditions.checkArgument(pathArgument.getIntType() >= 0
-            && pathArgument.getIntType() < PathArgumentType.values().length,
-            "Illegal PathArgumentType " + pathArgument.getIntType());
-
         switch(PathArgumentType.values()[pathArgument.getIntType()]){
             case NODE_IDENTIFIER_WITH_VALUE : {
 
@@ -272,13 +267,21 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         List<NormalizedNodeMessages.PathArgumentAttribute> attributesList) {
 
-        Map<QName, Object> map = new HashMap<>();
-
-        for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+        Map<QName, Object> map;
+        if(attributesList.size() == 1) {
+            NormalizedNodeMessages.PathArgumentAttribute attribute = attributesList.get(0);
             NormalizedNodeMessages.QName name = attribute.getName();
             Object value = parseAttribute(context, attribute);
+            map = Collections.singletonMap(QNameFactory.create(qNameToString(context, name)), value);
+        } else {
+            map = new HashMap<>();
+
+            for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+                NormalizedNodeMessages.QName name = attribute.getName();
+                Object value = parseAttribute(context, attribute);
 
-            map.put(QNameFactory.create(qNameToString(context, name)), value);
+                map.put(QNameFactory.create(qNameToString(context, name)), value);
+            }
         }
 
         return map;
index 04c95d6..8def754 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -70,9 +69,6 @@ public class ValueSerializer {
 
 
     private static Object deSerializeBasicTypes(int valueType, String value) {
-        Preconditions.checkArgument(valueType >= 0 && valueType < ValueType.values().length,
-            "Illegal value type " + valueType );
-
         switch(ValueType.values()[valueType]){
            case SHORT_TYPE: {
                return Short.valueOf(value);
index 8724dfe..49db896 100644 (file)
@@ -50,8 +50,9 @@ public enum ValueType {
     public static final ValueType getSerializableType(Object node){
         Preconditions.checkNotNull(node, "node should not be null");
 
-        if(types.containsKey(node.getClass())) {
-            return types.get(node.getClass());
+        ValueType type = types.get(node.getClass());
+        if(type != null) {
+            return type;
         } else if(node instanceof Set){
             return BITS_TYPE;
         }
index 1021dde..83164b0 100644 (file)
@@ -8,11 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
-
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
@@ -27,22 +28,30 @@ public class DatastoreContext {
     private final Duration shardTransactionIdleTimeout;
     private final int operationTimeoutInSeconds;
     private final String dataStoreMXBeanType;
+    private final ConfigParams shardRaftConfig;
 
     public DatastoreContext() {
-        this.dataStoreProperties = null;
-        this.dataStoreMXBeanType = "DistributedDatastore";
-        this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
-        this.operationTimeoutInSeconds = 5;
+        this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500);
     }
 
     public DatastoreContext(String dataStoreMXBeanType,
             InMemoryDOMDataStoreConfigProperties dataStoreProperties,
             Duration shardTransactionIdleTimeout,
-            int operationTimeoutInSeconds) {
+            int operationTimeoutInSeconds,
+            int shardJournalRecoveryLogBatchSize,
+            int shardSnapshotBatchCount,
+            int shardHeartbeatIntervalInMillis) {
         this.dataStoreMXBeanType = dataStoreMXBeanType;
-        this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+        this.dataStoreProperties = dataStoreProperties;
         this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
         this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+
+        DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                TimeUnit.MILLISECONDS));
+        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+        shardRaftConfig = raftConfig;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -60,4 +69,8 @@ public class DatastoreContext {
     public int getOperationTimeoutInSeconds() {
         return operationTimeoutInSeconds;
     }
+
+    public ConfigParams getShardRaftConfig() {
+        return shardRaftConfig;
+    }
 }
index 0fa2770..ddb5989 100644 (file)
@@ -20,6 +20,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -47,12 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -67,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -84,8 +82,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class Shard extends RaftActor {
 
-    private static final ConfigParams configParams = new ShardConfigParams();
-
     public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
@@ -114,11 +110,18 @@ public class Shard extends RaftActor {
 
     private ActorRef createSnapshotTransaction;
 
+    /**
+     * Coordinates persistence recovery on startup.
+     */
+    private ShardRecoveryCoordinator recoveryCoordinator;
+    private List<Object> currentLogRecoveryBatch;
+
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+        super(name.toString(), mapPeerAddresses(peerAddresses),
+                Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
@@ -333,35 +336,12 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Could not find cohort for modification : {}. Writing modification using a new transaction",
-                    modification);
-            }
-
-            DOMStoreWriteTransaction transaction =
-                store.newWriteOnlyTransaction();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
-            }
-
-            modification.apply(transaction);
-            try {
-                syncCommitTransaction(transaction);
-            } catch (InterruptedException | ExecutionException e) {
-                shardMBean.incrementFailedTransactionsCount();
-                LOG.error("Failed to commit", e);
-                return;
-            }
-            //we want to just apply the recovery commit and return
-            shardMBean.incrementCommittedTransactionCount();
+            // If there's no cached cohort then we must be applying replicated state.
+            commitWithNewTransaction(serialized);
             return;
         }
 
-
-        if(sender == null){
+        if(sender == null) {
             LOG.error("Commit failed. Sender cannot be null");
             return;
         }
@@ -386,6 +366,18 @@ public class Shard extends RaftActor {
 
     }
 
+    private void commitWithNewTransaction(Object modification) {
+        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+        MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+        try {
+            syncCommitTransaction(tx);
+            shardMBean.incrementCommittedTransactionCount();
+        } catch (InterruptedException | ExecutionException e) {
+            shardMBean.incrementFailedTransactionsCount();
+            LOG.error(e, "Failed to commit");
+        }
+    }
+
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
         Object serializedModification =
             message.getModification().toSerializable();
@@ -461,26 +453,102 @@ public class Shard extends RaftActor {
         return config.isMetricCaptureEnabled();
     }
 
-    @Override protected void applyState(ActorRef clientActor, String identifier,
-        Object data) {
+    @Override
+    protected
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+        }
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+        if (data instanceof CompositeModificationPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else {
+            LOG.error("Unknown state received {} during recovery", data);
+        }
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+        }
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+                    currentLogRecoveryBatch.size());
+        }
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        if(recoveryCoordinator != null) {
+            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+            }
+
+            for(DOMStoreWriteTransaction tx: txList) {
+                try {
+                    syncCommitTransaction(tx);
+                    shardMBean.incrementCommittedTransactionCount();
+                } catch (InterruptedException | ExecutionException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error(e, "Failed to commit");
+                }
+            }
+        }
+
+        recoveryCoordinator = null;
+        currentLogRecoveryBatch = null;
+        updateJournalStats();
+    }
+
+    @Override
+    protected void applyState(ActorRef clientActor, String identifier, Object data) {
 
         if (data instanceof CompositeModificationPayload) {
-            Object modification =
-                ((CompositeModificationPayload) data).getModification();
+            Object modification = ((CompositeModificationPayload) data).getModification();
 
             if (modification != null) {
                 commit(clientActor, modification);
             } else {
                 LOG.error(
                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor.path().toString());
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
             }
 
         } else {
-            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
+            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    data, data.getClass().getClassLoader(),
+                    CompositeModificationPayload.class.getClassLoader());
         }
 
-        // Update stats
+        updateJournalStats();
+
+    }
+
+    private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
         if (lastLogEntry != null) {
@@ -490,10 +558,10 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
-
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    protected void createSnapshot() {
         if (createSnapshotTransaction == null) {
 
             // Create a transaction. We are really going to treat the transaction as a worker
@@ -508,7 +576,9 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+    @VisibleForTesting
+    @Override
+    protected void applySnapshot(ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -565,16 +635,6 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
-
-    private static class ShardConfigParams extends DefaultConfigParamsImpl {
-        public static final FiniteDuration HEART_BEAT_INTERVAL =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        @Override public FiniteDuration getHeartBeatInterval() {
-            return HEART_BEAT_INTERVAL;
-        }
-    }
-
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
@@ -598,20 +658,24 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+    @VisibleForTesting
+    NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(YangInstanceIdentifier.builder().build());
+            transaction.read(id);
 
-        NormalizedNode<?, ?> node = future.get().get();
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
         transaction.close();
 
         return node;
     }
 
-    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+    @VisibleForTesting
+    void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
         throws ExecutionException, InterruptedException {
         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -620,4 +684,8 @@ public class Shard extends RaftActor {
         syncCommitTransaction(transaction);
     }
 
+    @VisibleForTesting
+    ShardStats getShardMBean() {
+        return shardMBean;
+    }
 }
index a97c00f..a8a1823 100644 (file)
@@ -15,11 +15,16 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -32,14 +37,18 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -50,7 +59,10 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActorWithMetering {
+public class ShardManager extends AbstractUntypedPersistentActor {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -74,6 +86,8 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     private final DatastoreContext datastoreContext;
 
+    private final Collection<String> knownModules = new HashSet<>(128);
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
@@ -105,7 +119,7 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
     }
 
     @Override
-    public void handleReceive(Object message) throws Exception {
+    public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
@@ -125,6 +139,23 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     }
 
+    @Override protected void handleRecover(Object message) throws Exception {
+
+        if(message instanceof SchemaContextModules){
+            SchemaContextModules msg = (SchemaContextModules) message;
+            knownModules.clear();
+            knownModules.addAll(msg.getModules());
+        } else if(message instanceof RecoveryFailure){
+            RecoveryFailure failure = (RecoveryFailure) message;
+            LOG.error(failure.cause(), "Recovery failed");
+        } else if(message instanceof RecoveryCompleted){
+            LOG.info("Recovery complete : {}", persistenceId());
+
+            // Delete all the messages from the akka journal except the last one
+            deleteMessages(lastSequenceNr() - 1);
+        }
+    }
+
     private void findLocalShard(FindLocalShard message) {
         ShardInformation shardInformation =
             localShards.get(message.getShardName());
@@ -159,16 +190,42 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
      *
      * @param message
      */
-    private void updateSchemaContext(Object message) {
-        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+    private void updateSchemaContext(final Object message) {
+        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+        Set<String> newModules = new HashSet<>(128);
+
+        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+            String s = moduleIdentifier.getNamespace().toString();
+            newModules.add(s);
+        }
+
+        if(newModules.containsAll(knownModules)) {
+
+            LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+            knownModules.clear();
+            knownModules.addAll(newModules);
+
+            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
-        if(localShards.size() == 0){
-            createLocalShards(schemaContext);
+                @Override public void apply(SchemaContextModules param) throws Exception {
+                    LOG.info("Sending new SchemaContext to Shards");
+                    if (localShards.size() == 0) {
+                        createLocalShards(schemaContext);
+                    } else {
+                        for (ShardInformation info : localShards.values()) {
+                            info.getActor().tell(message, getSelf());
+                        }
+                    }
+                }
+
+            });
         } else {
-            for (ShardInformation info : localShards.values()) {
-                info.getActor().tell(message, getSelf());
-            }
+            LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
         }
+
     }
 
     private void findPrimary(FindPrimary message) {
@@ -249,8 +306,8 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
-                    withMailbox(ActorContext.MAILBOX), shardId.toString());
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
+                    shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
@@ -306,6 +363,14 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
 
     }
 
+    @Override public String persistenceId() {
+        return "shard-manager-" + type;
+    }
+
+    @VisibleForTesting public Collection<String> getKnownModules() {
+        return knownModules;
+    }
+
     private class ShardInformation {
         private final String shardName;
         private final ActorRef actor;
@@ -371,6 +436,18 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
             return new ShardManager(type, cluster, configuration, datastoreContext);
         }
     }
+
+    static class SchemaContextModules implements Serializable {
+        private final Set<String> modules;
+
+        SchemaContextModules(Set<String> modules){
+            this.modules = modules;
+        }
+
+        public Set<String> getModules() {
+            return modules;
+        }
+    }
 }
 
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
new file mode 100644 (file)
index 0000000..8afdb4c
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
+ * and journal log entry batch are de-serialized and applied to their own write transaction
+ * instance in parallel on a thread pool for faster recovery time. However the transactions are
+ * committed to the data store in the order the corresponding snapshot or log batch are received
+ * to preserve data store integrity.
+ *
+ * @author Thomas Panetelis
+ */
+class ShardRecoveryCoordinator {
+
+    private static final int TIME_OUT = 10;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
+
+    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
+    private final SchemaContext schemaContext;
+    private final String shardName;
+    private final ExecutorService executor;
+
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+        this.shardName = shardName;
+
+        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+                new ThreadFactoryBuilder().setDaemon(true)
+                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
+    }
+
+    /**
+     * Submits a batch of journal log entries.
+     *
+     * @param logEntries the serialized journal log entries
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    /**
+     * Submits a snapshot.
+     *
+     * @param snapshot the serialized snapshot
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    Collection<DOMStoreWriteTransaction> getTransactions() {
+        // Shutdown the executor and wait for task completion.
+        executor.shutdown();
+
+        try {
+            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
+                return resultingTxList;
+            } else {
+                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        return Collections.emptyList();
+    }
+
+    private static abstract class ShardRecoveryTask implements Runnable {
+
+        final DOMStoreWriteTransaction resultingTx;
+
+        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
+            this.resultingTx = resultingTx;
+        }
+    }
+
+    private class LogRecoveryTask extends ShardRecoveryTask {
+
+        private final List<Object> logEntries;
+
+        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.logEntries = logEntries;
+        }
+
+        @Override
+        public void run() {
+            for(int i = 0; i < logEntries.size(); i++) {
+                MutableCompositeModification.fromSerializable(
+                        logEntries.get(i), schemaContext).apply(resultingTx);
+                // Null out to GC quicker.
+                logEntries.set(i, null);
+            }
+        }
+    }
+
+    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+
+        private final ByteString snapshot;
+
+        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.snapshot = snapshot;
+        }
+
+        @Override
+        public void run() {
+            try {
+                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
+                        YangInstanceIdentifier.builder().build(), serializedNode);
+
+                // delete everything first
+                resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+                // Add everything from the remote node back
+                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
+            } catch (InvalidProtocolBufferException e) {
+                LOG.error("Error deserializing snapshot", e);
+            }
+        }
+    }
+}
index e7a7aab..84614bd 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedConfigDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
index 814e6f6..3183527 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
index e19a767..af43f95 100644 (file)
@@ -36,8 +36,8 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
-    typedef non-zero-uint16-type {
-        type uint16 {
+    typedef non-zero-uint32-type {
+        type uint32 {
             range "1..max";
         }
     }
@@ -48,43 +48,67 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef heartbeat-interval-type {
+        type uint16 {
+            range "100..max";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
 
          leaf max-shard-data-store-executor-queue-size {
             default 5000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store executor.";
          }
 
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
 
+         leaf shard-snapshot-batch-count {
+            default 20000;
+            type non-zero-uint32-type;
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+         }
+
+         leaf shard-hearbeat-interval-in-millis {
+            default 500;
+            type heartbeat-interval-type;
+            description "The interval at which a shard will send a heart beat message to its remote shard.";
+         }
+
          leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
          }
 
+         leaf shard-journal-recovery-log-batch-size {
+            default 5000;
+            type non-zero-uint32-type;
+            description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
@@ -93,7 +117,7 @@ module distributed-datastore-provider {
 
          leaf bounded-mailbox-capacity {
              default 1000;
-             type non-zero-uint16-type;
+             type non-zero-uint32-type;
              description "Max queue size that an actor's mailbox can reach";
          }
     }
index 022ef9b..fae21f2 100644 (file)
@@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.commons.io.FileUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.io.File;
 import java.io.IOException;
 
 public abstract class AbstractActorTest {
@@ -25,35 +24,15 @@ public abstract class AbstractActorTest {
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
-
-        deletePersistenceFiles();
     }
 
     @AfterClass
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
-
-        deletePersistenceFiles();
-    }
-
-    protected static void deletePersistenceFiles() throws IOException {
-        File journal = new File("journal");
-
-        if(journal.exists()) {
-            FileUtils.deleteDirectory(journal);
-        }
-
-        File snapshots = new File("snapshots");
-
-        if(snapshots.exists()){
-            FileUtils.deleteDirectory(snapshots);
-        }
-
     }
 
     protected ActorSystem getSystem() {
         return system;
     }
-
 }
index 02201f7..8a3cdd0 100644 (file)
@@ -3,10 +3,23 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import junit.framework.Assert;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -19,17 +32,41 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ShardManagerTest {
     private static ActorSystem system;
 
     @BeforeClass
-    public static void setUp() {
-        system = ActorSystem.create("test");
+    public static void setUpClass() {
+        Map<String, String> myJournal = new HashMap<>();
+        myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
+        myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
+        Config config = ConfigFactory.load()
+            .withValue("akka.persistence.journal.plugin",
+                ConfigValueFactory.fromAnyRef("my-journal"))
+            .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
+
+        MyJournal.clear();
+
+        system = ActorSystem.create("test", config);
     }
 
     @AfterClass
@@ -38,29 +75,27 @@ public class ShardManagerTest {
         system = null;
     }
 
+    @Before
+    public void setUpTest(){
+        MyJournal.clear();
+    }
+
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+        new JavaTestKit(system) {
+            {
+                final Props props = ShardManager
+                    .props("config", new MockClusterWrapper(),
+                        new MockConfiguration(), new DatastoreContext());
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+                final ActorRef subject = getSystem().actorOf(props);
 
-                    subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
+                subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
 
-                    expectMsgEquals(Duration.Zero(),
-                        new PrimaryNotFound("inventory").toSerializable());
-
-                    expectNoMsg();
-                }
-            };
-        }};
+                expectMsgEquals(duration("2 seconds"),
+                    new PrimaryNotFound("inventory").toSerializable());
+            }};
     }
 
     @Test
@@ -70,22 +105,14 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
-
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+            final ActorRef subject = getSystem().actorOf(props);
 
-                    subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
-                    expectNoMsg();
-                }
-            };
+            expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
         }};
     }
 
@@ -96,31 +123,23 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
 
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
-
-                    subject.tell(new FindLocalShard("inventory"), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("10 seconds"), "find local") {
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof LocalShardNotFound) {
-                                return ((LocalShardNotFound) in).getShardName();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            final ActorRef subject = getSystem().actorOf(props);
 
-                    assertEquals("inventory", out);
+            subject.tell(new FindLocalShard("inventory"), getRef());
 
-                    expectNoMsg();
+            final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
+                @Override
+                protected String match(Object in) {
+                    if (in instanceof LocalShardNotFound) {
+                        return ((LocalShardNotFound) in).getShardName();
+                    } else {
+                        throw noMatch();
+                    }
                 }
-            };
+            }.get(); // this extracts the received message
+
+            assertEquals("inventory", out);
         }};
     }
 
@@ -133,40 +152,109 @@ public class ShardManagerTest {
             final Props props = ShardManager
                 .props("config", mockClusterWrapper,
                     new MockConfiguration(), new DatastoreContext());
-            final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+
+            final ActorRef subject = getSystem().actorOf(props);
 
             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            new Within(duration("10 seconds")) {
+            subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+            final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
                 @Override
-                protected void run() {
-
-                    subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
-
-                    final ActorRef out = new ExpectMsg<ActorRef>(duration("10 seconds"), "find local") {
-                        @Override
-                        protected ActorRef match(Object in) {
-                            if (in instanceof LocalShardFound) {
-                                return ((LocalShardFound) in).getPath();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+                protected ActorRef match(Object in) {
+                    if (in instanceof LocalShardFound) {
+                        return ((LocalShardFound) in).getPath();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get(); // this extracts the received message
+
+            assertTrue(out.path().toString(),
+                out.path().toString().contains("member-1-shard-default-config"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveMemberUp() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
 
-                    assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+            final ActorRef subject = getSystem().actorOf(props);
 
+            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
 
-                    expectNoMsg();
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
+                // do not put code outside this method, will run afterwards
+                @Override
+                protected String match(Object in) {
+                    if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+                        PrimaryFound f = PrimaryFound.fromSerializable(in);
+                        return f.getPrimaryPath();
+                    } else {
+                        throw noMatch();
+                    }
                 }
-            };
+            }.get(); // this extracts the received message
+
+            assertTrue(out, out.contains("member-2-shard-astronauts-config"));
         }};
     }
 
     @Test
-    public void testOnReceiveMemberUp() throws Exception {
+    public void testOnReceiveMemberDown() throws Exception {
 
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+
+            final ActorRef subject = getSystem().actorOf(props);
+
+            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+
+            MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+
+            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+            expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+        }};
+    }
+
+    @Test
+    public void testOnRecoveryJournalIsEmptied(){
+        MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
+            ImmutableSet.of("foo")));
+
+        assertEquals(1, MyJournal.get().size());
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+
+            final ActorRef subject = getSystem().actorOf(props);
+
+            // Send message to check that ShardManager is ready
+            subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+
+            assertEquals(0, MyJournal.get().size());
+        }};
+    }
+
+    @Test
+    public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
@@ -174,39 +262,63 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
-
-                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
-
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
-                                PrimaryFound f = PrimaryFound.fromSerializable(in);
-                                return f.getPrimaryPath();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
 
-                    Assert.assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
-                    expectNoMsg();
-                }
-            };
+            assertTrue(knownModules.contains("foo"));
         }};
     }
 
     @Test
-    public void testOnReceiveMemberDown() throws Exception {
+    public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+        throws Exception {
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration(), new DatastoreContext());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+            assertEquals(0, knownModules.size());
+
+            SchemaContext schemaContext = mock(SchemaContext.class);
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+            ModuleIdentifier foo = mock(ModuleIdentifier.class);
+            when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+            moduleIdentifierSet.add(foo);
+
+            when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("foo"));
+
+            assertEquals(1, knownModules.size());
+
+            ModuleIdentifier bar = mock(ModuleIdentifier.class);
+            when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+            moduleIdentifierSet.add(bar);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("bar"));
 
+            assertEquals(2, knownModules.size());
+
+        }};
+
+    }
+
+
+    @Test
+    public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+        throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
@@ -214,28 +326,117 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
+            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
-                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+            assertEquals(0, knownModules.size());
 
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            SchemaContext schemaContext = mock(SchemaContext.class);
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
 
-                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            ModuleIdentifier foo = mock(ModuleIdentifier.class);
+            when(foo.getNamespace()).thenReturn(new URI("foo"));
 
-                    MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+            moduleIdentifierSet.add(foo);
 
-                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
 
-                    expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertTrue(knownModules.contains("foo"));
+
+            assertEquals(1, knownModules.size());
+
+            //Create a completely different SchemaContext with only the bar module in it
+            schemaContext = mock(SchemaContext.class);
+            moduleIdentifierSet = new HashSet<>();
+            ModuleIdentifier bar = mock(ModuleIdentifier.class);
+            when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+            moduleIdentifierSet.add(bar);
+
+            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertFalse(knownModules.contains("bar"));
+
+            assertEquals(1, knownModules.size());
 
-                    expectNoMsg();
-                }
-            };
         }};
+
+    }
+
+
+    private void sleep(long period){
+        Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
     }
 
+    public static class MyJournal extends AsyncWriteJournal {
+
+        private static Map<Long, Object> journal = Maps.newTreeMap();
+
+        public static void addToJournal(Long sequenceNr, Object value){
+            journal.put(sequenceNr, value);
+        }
+
+        public static Map<Long, Object> get(){
+            return journal;
+        }
+
+        public static void clear(){
+            journal.clear();
+        }
 
+        @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
+            final Procedure<PersistentRepr> replayCallback) {
+            if(journal.size() == 0){
+                return Futures.successful(null);
+            }
+            return Futures.future(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    for (Map.Entry<Long, Object> entry : journal.entrySet()) {
+                        PersistentRepr persistentMessage =
+                            new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                false, null, null);
+                        replayCallback.apply(persistentMessage);
+                    }
+                    return null;
+                }
+            }, context().dispatcher());
+        }
+
+        @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+            return Futures.successful(-1L);
+        }
+
+        @Override public Future<Void> doAsyncWriteMessages(
+            final Iterable<PersistentRepr> persistentReprs) {
+            return Futures.future(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    for (PersistentRepr repr : persistentReprs){
+                        if(repr.payload() instanceof ShardManager.SchemaContextModules) {
+                            journal.put(repr.sequenceNr(), repr.payload());
+                        }
+                    }
+                    return null;
+                }
+            }, context().dispatcher());
+        }
+
+        @Override public Future<Void> doAsyncWriteConfirmations(
+            Iterable<PersistentConfirmation> persistentConfirmations) {
+            return Futures.successful(null);
+        }
+
+        @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
+            boolean b) {
+            clear();
+            return Futures.successful(null);
+        }
+
+        @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+            clear();
+            return Futures.successful(null);
+        }
+    }
 }
index deb71c2..a3e0b3a 100644 (file)
@@ -4,23 +4,43 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
+import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -28,222 +48,138 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
 
 public class ShardTest extends AbstractActorTest {
 
-    private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
+    private static final DatastoreContext DATA_STORE_CONTEXT =
+            new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
 
-    @Test
-    public void testOnReceiveRegisterListener() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testRegisterChangeListener");
+    private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+    @Before
+    public void setUp() {
+        System.setProperty("shard.persistent", "false");
 
-                    subject.tell(
-                        new UpdateSchemaContext(SchemaContextHelper.full()),
-                        getRef());
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                        getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
-                        getRef());
+    @After
+    public void tearDown() {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    final Boolean notificationEnabled = new ExpectMsg<Boolean>(
-                                                   duration("3 seconds"), "enable notification") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if(in instanceof EnableNotification){
-                                return ((EnableNotification) in).isEnabled();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertFalse(notificationEnabled);
-
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(RegisterChangeListenerReply.class)) {
-                                RegisterChangeListenerReply reply =
-                                    (RegisterChangeListenerReply) in;
-                                return reply.getListenerRegistrationPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+    private Props newShardProps() {
+        return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+    }
 
-                    assertTrue(out.matches(
-                        "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-                }
+    @Test
+    public void testOnReceiveRegisterListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
 
+            subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
 
-            };
+            subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+                    getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+            EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
+            assertEquals("isEnabled", false, enable.isEnabled());
+
+            RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+                    RegisterChangeListenerReply.class);
+            assertTrue(reply.getListenerRegistrationPath().toString().matches(
+                    "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
         }};
     }
 
     @Test
     public void testCreateTransaction(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransaction");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
+        new ShardTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            waitUntilLeader(subject);
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testCreateTransactionOnChain(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransactionOnChain");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
-
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            waitUntilLeader(subject);
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+                    getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testPeerAddressResolved(){
         new JavaTestKit(getSystem()) {{
-            Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
             final ShardIdentifier identifier =
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testPeerAddressResolved");
+            Props props = Shard.props(identifier,
+                    Collections.<ShardIdentifier, String>singletonMap(identifier, null),
+                    DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+            final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
 
             new Within(duration("3 seconds")) {
                 @Override
@@ -261,99 +197,205 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testApplySnapshot() throws ExecutionException, InterruptedException {
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
 
-        final ShardIdentifier identifier =
-            ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
+        NormalizedNodeToNodeCodec codec =
+            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        peerAddresses.put(identifier, null);
-        final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
+                TestModel.TEST_QNAME));
 
-        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
 
-        ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+        NormalizedNodeMessages.Container encode = codec.encode(root, expected);
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
 
-        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        ref.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode expected = ref.underlyingActor().readStore();
+        NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
 
-        NormalizedNodeMessages.Container encode = codec
-            .encode(YangInstanceIdentifier.builder().build(), expected);
+        assertEquals(expected, actual);
+    }
 
+    @Test
+    public void testApplyState() throws Exception {
 
-        ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-        NormalizedNode actual = ref.underlyingActor().readStore();
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        assertEquals(expected, actual);
-    }
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
+        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
+        ApplyState applyState = new ApplyState(null, "test",
+                new ReplicatedLogImplEntry(1, 2, payload));
 
-    private static class ShardTestKit extends JavaTestKit {
+        shard.underlyingActor().onReceiveCommand(applyState);
 
-        private ShardTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
+        NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
+        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+
+        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
+        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+
+        InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
+                        YangInstanceIdentifier.builder().build(), root).
+                                getNormalizedNode().toByteString().toByteArray(),
+                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                          SCHEMA_CONTEXT))));
+
+        int nListEntries = 11;
+        Set<Integer> listEntryKeys = new HashSet<>();
+        for(int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newPayload(mod)));
         }
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(logLevel
-                ) {
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+                new ApplyLogEntries(nListEntries));
+
+        // Create the actor and wait for recovery complete.
+
+        final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+        Creator<Shard> creator = new Creator<Shard>() {
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                        DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
                     @Override
-                    protected Boolean run() {
-                        return true;
+                    protected void onRecoveryComplete() {
+                        try {
+                            super.onRecoveryComplete();
+                        } finally {
+                            recoveryComplete.countDown();
+                        }
                     }
-                }.from(subject.path().toString())
-                    .message(logMessage)
-                    .occurrences(1).exec();
+                };
+            }
+        };
 
-            Assert.assertEquals(true, result);
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+
+        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+
+        // Verify data in the data store.
+
+        NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        for(Object entry: (Iterable<?>) outerList.getValue()) {
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                    entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            Object value = idLeaf.get().getValue();
+            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
+                    listEntryKeys.remove(value));
+        }
 
+        if(!listEntryKeys.isEmpty()) {
+            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
+                    listEntryKeys);
         }
 
+        assertEquals("Last log index", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastLogIndex());
+        assertEquals("Commit index", nListEntries,
+                shard.underlyingActor().getShardMBean().getCommitIndex());
+        assertEquals("Last applied", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastApplied());
     }
 
+    private CompositeModificationPayload newPayload(Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationPayload(compMod.toSerializable());
+    }
+
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
+    public void testForwardedCommitTransactionWithPersistence() throws IOException {
+        System.setProperty("shard.persistent", "true");
+
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateSnapshot");
+            waitUntilLeader(shard);
 
-            // Wait for a specific log message to show up
-            this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+            NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            MutableCompositeModification modification = new MutableCompositeModification();
+            modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
+                    SCHEMA_CONTEXT));
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+            verify(cohort).commit();
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
+        }};
+    }
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+    @Test
+    public void testCreateSnapshot() throws IOException, InterruptedException {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
 
-                }
-            };
+            waitUntilLeader(subject);
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
-            deletePersistenceFiles();
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
         }};
     }
 
@@ -366,7 +408,7 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
 
-        store.onGlobalContextUpdated(TestModel.createTestContext());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
         putTransaction.write(TestModel.TEST_PATH,
@@ -424,4 +466,46 @@ public class ShardTest extends AbstractActorTest {
             }
         };
     }
+
+    private static final class DelegatingShardCreator implements Creator<Shard> {
+        private final Creator<Shard> delegate;
+
+        DelegatingShardCreator(Creator<Shard> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return delegate.create();
+        }
+    }
+
+    private static class ShardTestKit extends JavaTestKit {
+
+        private ShardTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(logLevel
+                ) {
+                    @Override
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message(logMessage)
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+
+        protected void waitUntilLeader(ActorRef subject) {
+            waitForLogMessage(Logging.Info.class, subject,
+                    "Switching from state Candidate to Leader");
+        }
+    }
 }
index 0beb00b..3f31591 100644 (file)
@@ -503,7 +503,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         datastoreContext = new DatastoreContext("Test",
                 InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS), 5);
+                Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java
new file mode 100644 (file)
index 0000000..c9a0eaf
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+
+public class InMemoryJournal extends AsyncWriteJournal {
+
+    private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+    public static void addEntry(String persistenceId, long sequenceNr, Object data) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal == null) {
+            journal = Maps.newLinkedHashMap();
+            journals.put(persistenceId, journal);
+        }
+
+        journal.put(sequenceNr, data);
+    }
+
+    public static void clear() {
+        journals.clear();
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+            long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                Map<Long, Object> journal = journals.get(persistenceId);
+                if(journal == null) {
+                    return null;
+                }
+
+                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                    PersistentRepr persistentMessage =
+                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+                    replayCallback.apply(persistentMessage);
+                }
+
+                return null;
+            }
+        }, context().dispatcher());
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+        return Futures.successful(new Long(0));
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        return Futures.successful(null);
+    }
+}
index 0e492f0..22e522b 100644 (file)
@@ -16,46 +16,66 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.snapshot.japi.SnapshotStore;
 import com.google.common.collect.Iterables;
 import scala.concurrent.Future;
-
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 
 public class InMemorySnapshotStore extends SnapshotStore {
 
-    Map<String, List<Snapshot>> snapshots = new HashMap<>();
+    private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
+
+    public static void addSnapshot(String persistentId, Snapshot snapshot) {
+        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+
+        if(snapshotList == null) {
+            snapshotList = new ArrayList<>();
+            snapshots.put(persistentId, snapshotList);
+        }
+
+        snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+                System.currentTimeMillis()), snapshot));
+    }
+
+    public static void clear() {
+        snapshots.clear();
+    }
 
-    @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+    @Override
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
         SnapshotSelectionCriteria snapshotSelectionCriteria) {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
         if(snapshotList == null){
             return Futures.successful(Option.<SelectedSnapshot>none());
         }
 
-        Snapshot snapshot = Iterables.getLast(snapshotList);
+        StoredSnapshot snapshot = Iterables.getLast(snapshotList);
         SelectedSnapshot selectedSnapshot =
             new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
         return Futures.successful(Option.some(selectedSnapshot));
     }
 
-    @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             snapshotList = new ArrayList<>();
             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
         }
-        snapshotList.add(new Snapshot(snapshotMetadata, o));
+        snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
 
         return Futures.successful(null);
     }
 
-    @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+    @Override
+    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
     }
 
-    @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             return;
@@ -64,7 +84,7 @@ public class InMemorySnapshotStore extends SnapshotStore {
         int deleteIndex = -1;
 
         for(int i=0;i<snapshotList.size(); i++){
-            Snapshot snapshot = snapshotList.get(i);
+            StoredSnapshot snapshot = snapshotList.get(i);
             if(snapshotMetadata.equals(snapshot.getMetadata())){
                 deleteIndex = i;
                 break;
@@ -77,9 +97,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
 
     }
 
-    @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+    @Override
+    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
         throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
 
         if(snapshotList == null){
             return;
@@ -90,11 +111,11 @@ public class InMemorySnapshotStore extends SnapshotStore {
         snapshots.remove(s);
     }
 
-    private static class Snapshot {
+    private static class StoredSnapshot {
         private final SnapshotMetadata metadata;
         private final Object data;
 
-        private Snapshot(SnapshotMetadata metadata, Object data) {
+        private StoredSnapshot(SnapshotMetadata metadata, Object data) {
             this.metadata = metadata;
             this.data = data;
         }
index f0dadc6..3a37dd9 100644 (file)
@@ -1,5 +1,6 @@
 akka {
     persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
 
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
@@ -17,6 +18,10 @@ akka {
     }
 }
 
+in-memory-journal {
+    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
 in-memory-snapshot-store {
   # Class name of the plugin.
   class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
index b421e56..cde0157 100644 (file)
@@ -1,7 +1,5 @@
 package org.opendaylight.xsql;
 
-import java.util.concurrent.ExecutionException;
-
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQL;
@@ -25,16 +23,15 @@ public class XSQLProvider implements AutoCloseable {
             XSQLBuilder builder = new XSQLBuilder();
             builder.setPort("34343");
             XSQL xsql = builder.build();
-            if (dps != null) {
-                final DataModificationTransaction t = dps.beginTransaction();
-                t.removeOperationalData(ID);
-                t.putOperationalData(ID,xsql);
-
-                try {
+            try {
+                if (dps != null) {
+                    final DataModificationTransaction t = dps.beginTransaction();
+                    t.removeOperationalData(ID);
+                    t.putOperationalData(ID,xsql);
                     t.commit().get();
-                } catch (InterruptedException | ExecutionException e) {
-                   LOG.warn("Failed to update toaster status, operational otherwise", e);
                 }
+            } catch (Exception e) {
+                LOG.warn("Failed to update XSQL port status, ", e);
             }
         return xsql;
     }
index c32d960..a1bf2c8 100644 (file)
@@ -1,21 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>\r
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
 \r
-    <!--
-        Licensed to the Apache Software Foundation (ASF) under one or more
-        contributor license agreements.  See the NOTICE file distributed with
-        this work for additional information regarding copyright ownership.
-        The ASF licenses this file to You under the Apache License, Version 2.0
-        (the "License"); you may not use this file except in compliance with
-        the License.  You may obtain a copy of the License at
-
-            http://www.apache.org/licenses/LICENSE-2.0
-
-        Unless required by applicable law or agreed to in writing, software
-        distributed under the License is distributed on an "AS IS" BASIS,
-        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-        See the License for the specific language governing permissions and
-        limitations under the License.
+    <!--\r
+        Licensed to the Apache Software Foundation (ASF) under one or more\r
+        contributor license agreements.  See the NOTICE file distributed with\r
+        this work for additional information regarding copyright ownership.\r
+        The ASF licenses this file to You under the Apache License, Version 2.0\r
+        (the "License"); you may not use this file except in compliance with\r
+        the License.  You may obtain a copy of the License at\r
+\r
+            http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+        Unless required by applicable law or agreed to in writing, software\r
+        distributed under the License is distributed on an "AS IS" BASIS,\r
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+        See the License for the specific language governing permissions and\r
+        limitations under the License.\r
     -->\r
 \r
     <modelVersion>4.0.0</modelVersion>\r
     <version>1.1-SNAPSHOT</version>\r
   </parent>\r
 \r
-    <groupId>xsqlcommand</groupId>\r
-    <artifactId>xsqlcommand</artifactId>\r
+    <groupId>org.opendaylight.controller</groupId>\r
+    <artifactId>sal-karaf-xsql</artifactId>\r
     <packaging>bundle</packaging>\r
-    <version>1.0.0-SNAPSHOT</version>\r
     <name>Apache Karaf :: Shell odl/xsql Commands</name>\r
 \r
     <description>Provides the OSGi odl commands</description>\r
@@ -64,7 +63,6 @@
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-dom-xsql</artifactId>\r
-            <type>bundle</type>\r
             <version>1.1-SNAPSHOT</version>\r
         </dependency>\r
     </dependencies>\r
diff --git a/opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml b/opendaylight/md-sal/sal-karaf-xsql/src/main/resources/OSGI-INF/blueprint/shell-log.xml
new file mode 100644 (file)
index 0000000..e9a4a23
--- /dev/null
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0">
+
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
+        <command>
+            <action class="org.opendaylight.controller.xsql.xsql">
+            </action>
+        </command>
+    </command-bundle>
+</blueprint>
index 2e2100b..e0a51a8 100644 (file)
@@ -4,6 +4,11 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
+  <parent>
+     <groupId>org.opendaylight.controller.samples</groupId>
+     <artifactId>sal-samples</artifactId>
+     <version>1.1-SNAPSHOT</version>
+  </parent>
 
   <artifactId>l2switch.aggregator</artifactId>
   <groupId>org.opendaylight.controller.samples.l2switch</groupId>
index bc8efbe..1adcd7e 100644 (file)
@@ -107,9 +107,9 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     /**
      * Test all requests are handled properly and no mismatch occurs in listener
      */
-    @Test(timeout = 5*60*1000)
+    @Test(timeout = 6*60*1000)
     public void testSecureStress() throws Exception {
-        final int requests = 10000;
+        final int requests = 4000;
 
         final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
         final NetconfDeviceCommunicator sessionListener = getSessionListener();
index 08441b4..4b5dcd7 100644 (file)
@@ -69,11 +69,11 @@ public class JaxBSerializerTest {
                 "<session-id>1</session-id>" +
                 "<in-bad-rpcs>0</in-bad-rpcs>" +
                 "<in-rpcs>0</in-rpcs>" +
-                "<login-time>loginTime</login-time>" +
+                "<login-time>2010-10-10T12:32:32Z</login-time>" +
                 "<out-notifications>0</out-notifications>" +
                 "<out-rpc-errors>0</out-rpc-errors>" +
                 "<ncme:session-identifier>client</ncme:session-identifier>" +
-                "<source-host>address/port</source-host>" +
+                "<source-host>192.168.1.1</source-host>" +
                 "<transport>ncme:netconf-tcp</transport>" +
                 "<username>username</username>" +
                 "</session>"));
@@ -96,8 +96,8 @@ public class JaxBSerializerTest {
         final Session1 mockedSession1 = mock(Session1.class);
         doReturn("client").when(mockedSession1).getSessionIdentifier();
         doReturn(1L).when(mocked).getSessionId();
-        doReturn(new DateAndTime("loginTime")).when(mocked).getLoginTime();
-        doReturn(new Host(new DomainName("address/port"))).when(mocked).getSourceHost();
+        doReturn(new DateAndTime("2010-10-10T12:32:32Z")).when(mocked).getLoginTime();
+        doReturn(new Host(new DomainName("192.168.1.1"))).when(mocked).getSourceHost();
         doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInBadRpcs();
         doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInRpcs();
         doReturn(new ZeroBasedCounter32(0L)).when(mocked).getOutNotifications();
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java
new file mode 100644 (file)
index 0000000..73a24f2
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listener on async input stream from SSH session.
+ * This listeners schedules reads in a loop until the session is closed or read fails.
+ */
+final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+
+    private static final int BUFFER_SIZE = 8192;
+
+    private final ChannelOutboundHandler asyncSshHandler;
+    private final ChannelHandlerContext ctx;
+
+    private IoInputStream asyncOut;
+    private Buffer buf;
+    private IoReadFuture currentReadFuture;
+
+    public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        this.asyncSshHandler = asyncSshHandler;
+        this.ctx = ctx;
+        this.asyncOut = asyncOut;
+        buf = new Buffer(BUFFER_SIZE);
+        asyncOut.read(buf).addListener(this);
+    }
+
+    @Override
+    public synchronized void operationComplete(final IoReadFuture future) {
+        if(future.getException() != null) {
+            if(asyncOut.isClosed() || asyncOut.isClosing()) {
+                // Ssh dropped
+                logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+            } else {
+                logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+            }
+            invokeDisconnect();
+            return;
+        }
+
+        if (future.getRead() > 0) {
+            ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+
+            // Schedule next read
+            buf = new Buffer(BUFFER_SIZE);
+            currentReadFuture = asyncOut.read(buf);
+            currentReadFuture.addListener(this);
+        }
+    }
+
+    private void invokeDisconnect() {
+        try {
+            asyncSshHandler.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            // This should not happen
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        // Remove self as listener on close to prevent reading from closed input
+        if(currentReadFuture != null) {
+            currentReadFuture.removeListener(this);
+        }
+
+        asyncOut = null;
+    }
+}
index 3d1e478..3bd7232 100644 (file)
@@ -9,10 +9,7 @@
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,12 +22,6 @@ import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +47,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final AuthenticationHandler authenticationHandler;
     private final SshClient sshClient;
 
-    private SshReadAsyncListener sshReadAsyncListener;
-    private SshWriteAsyncHandler sshWriteAsyncHandler;
+    private AsyncSshHanderReader sshReadAsyncListener;
+    private AsyncSshHandlerWriter sshWriteAsyncHandler;
 
     private ClientChannel channel;
     private ClientSession session;
@@ -147,10 +138,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+        sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
         // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
         if(channel != null) {
-            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
             ctx.fireChannelActive();
         }
     }
@@ -207,178 +198,4 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
-    /**
-     * Listener over async input stream from SSH session.
-     * This listeners schedules reads in a loop until the session is closed or read fails.
-     */
-    private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
-        private static final int BUFFER_SIZE = 8192;
-
-        private final ChannelOutboundHandler asyncSshHandler;
-        private final ChannelHandlerContext ctx;
-
-        private IoInputStream asyncOut;
-        private Buffer buf;
-        private IoReadFuture currentReadFuture;
-
-        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
-            this.asyncSshHandler = asyncSshHandler;
-            this.ctx = ctx;
-            this.asyncOut = asyncOut;
-            buf = new Buffer(BUFFER_SIZE);
-            asyncOut.read(buf).addListener(this);
-        }
-
-        @Override
-        public synchronized void operationComplete(final IoReadFuture future) {
-            if(future.getException() != null) {
-                if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // Ssh dropped
-                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
-                } else {
-                    logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                }
-                invokeDisconnect();
-                return;
-            }
-
-            if (future.getRead() > 0) {
-                ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
-                // Schedule next read
-                buf = new Buffer(BUFFER_SIZE);
-                currentReadFuture = asyncOut.read(buf);
-                currentReadFuture.addListener(this);
-            }
-        }
-
-        private void invokeDisconnect() {
-            try {
-                asyncSshHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception e) {
-                // This should not happen
-                throw new IllegalStateException(e);
-            }
-        }
-
-        @Override
-        public synchronized void close() {
-            // Remove self as listener on close to prevent reading from closed input
-            if(currentReadFuture != null) {
-                currentReadFuture.removeListener(this);
-            }
-
-            asyncOut = null;
-        }
-    }
-
-    private static final class SshWriteAsyncHandler implements AutoCloseable {
-        public static final int MAX_PENDING_WRITES = 100;
-
-        private final ChannelOutboundHandler channelHandler;
-        private IoOutputStream asyncIn;
-
-        // Counter that holds the amount of pending write messages
-        // Pending write can occur in case remote window is full
-        // In such case, we need to wait for the pending write to finish
-        private int pendingWriteCounter;
-        // Last write future, that can be pending
-        private IoWriteFuture lastWriteFuture;
-
-        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
-            this.channelHandler = channelHandler;
-            this.asyncIn = asyncIn;
-        }
-
-        int c = 0;
-
-        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-            try {
-                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
-                    // If we are closed/closing, set immediate fail
-                    promise.setFailure(new IllegalStateException("Channel closed"));
-                } else {
-                    lastWriteFuture = asyncIn.write(toBuffer(msg));
-                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            ((ByteBuf) msg).release();
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            } else {
-                                promise.setFailure(future.getException());
-                            }
-
-                            // Reset last pending future
-                            synchronized (SshWriteAsyncHandler.this) {
-                                lastWriteFuture = null;
-                            }
-                        }
-                    });
-                }
-            } catch (final WritePendingException e) {
-                // Check limit for pending writes
-                pendingWriteCounter++;
-                if(pendingWriteCounter > MAX_PENDING_WRITES) {
-                    promise.setFailure(e);
-                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
-                            ", remote window is not getting read or is too small"));
-                }
-
-                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
-                ((ByteBuf) msg).resetReaderIndex();
-                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
-                // In case of pending, re-invoke write after pending is finished
-                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
-                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
-                        // External thread could trigger write on this instance while we are on this line
-                        // Verify
-                        if (future.isWritten()) {
-                            synchronized (SshWriteAsyncHandler.this) {
-                                // Pending done, decrease counter
-                                pendingWriteCounter--;
-                                write(ctx, msg, promise);
-                            }
-                        } else {
-                            // Cannot reschedule pending, fail
-                            handlePendingFailed(ctx, e);
-                        }
-                    }
-
-                });
-            }
-        }
-
-        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            try {
-                channelHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception ex) {
-                // This should not happen
-                throw new IllegalStateException(ex);
-            }
-        }
-
-        @Override
-        public void close() {
-            asyncIn = null;
-        }
-
-        private Buffer toBuffer(final Object msg) {
-            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-            Preconditions.checkState(msg instanceof ByteBuf);
-            final ByteBuf byteBuf = (ByteBuf) msg;
-            final byte[] temp = new byte[byteBuf.readableBytes()];
-            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-            return new Buffer(temp);
-        }
-
-    }
 }
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java
new file mode 100644 (file)
index 0000000..eace0ac
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
+ * Also handles pending writes by caching requests until pending state is over.
+ */
+final class AsyncSshHandlerWriter implements AutoCloseable {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(AsyncSshHandlerWriter.class);
+
+    // public static final int MAX_PENDING_WRITES = 1000;
+    // TODO implement Limiting mechanism for pending writes
+    // But there is a possible issue with limiting:
+    // 1. What to do when queue is full ? Immediate Fail for every request ?
+    // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
+    // when we send/queue 1 chunk and fail the other chunks
+
+    private IoOutputStream asyncIn;
+
+    // Order has to be preserved for queued writes
+    private final Deque<PendingWriteRequest> pending = new LinkedList<>();
+
+    public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
+        this.asyncIn = asyncIn;
+    }
+
+    public synchronized void write(final ChannelHandlerContext ctx,
+            final Object msg, final ChannelPromise promise) {
+        // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+        // If we are closed/closing, set immediate fail
+        if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+            promise.setFailure(new IllegalStateException("Channel closed"));
+        } else {
+            final ByteBuf byteBufMsg = (ByteBuf) msg;
+            if (pending.isEmpty() == false) {
+                queueRequest(ctx, byteBufMsg, promise);
+                return;
+            }
+
+            writeWithPendingDetection(ctx, promise, byteBufMsg);
+        }
+    }
+
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+        try {
+            if (logger.isTraceEnabled()) {
+                logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+            }
+            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
+
+                        @Override
+                        public void operationComplete(final IoWriteFuture future) {
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                            }
+
+                            // Notify success or failure
+                            if (future.isWritten()) {
+                                promise.setSuccess();
+                            } else {
+                                logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+                                promise.setFailure(future.getException());
+                            }
+
+                            // Not needed anymore, release
+                            byteBufMsg.release();
+
+                            // Check pending queue and schedule next
+                            // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+                            writePendingIfAny();
+                        }
+                    });
+        } catch (final WritePendingException e) {
+            queueRequest(ctx, byteBufMsg, promise);
+        }
+    }
+
+    private synchronized void writePendingIfAny() {
+        if (pending.peek() == null) {
+            return;
+        }
+
+        // In case of pending, reschedule next message from queue
+        final PendingWriteRequest pendingWrite = pending.poll();
+        final ByteBuf msg = pendingWrite.msg;
+        if (logger.isTraceEnabled()) {
+            logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+        }
+
+        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+    }
+
+    private static String byteBufToString(final ByteBuf msg) {
+        msg.resetReaderIndex();
+        final String s = msg.toString(Charsets.UTF_8);
+        msg.resetReaderIndex();
+        return s;
+    }
+
+    private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+//        try {
+        logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+        }
+        new PendingWriteRequest(ctx, msg, promise).pend(pending);
+//        } catch (final Exception ex) {
+//            logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+//            msg.release();
+//            promise.setFailure(ex);
+//        }
+    }
+
+    @Override
+    public synchronized void close() {
+        asyncIn = null;
+    }
+
+    private Buffer toBuffer(final ByteBuf msg) {
+        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        final byte[] temp = new byte[msg.readableBytes()];
+        msg.readBytes(temp, 0, msg.readableBytes());
+        return new Buffer(temp);
+    }
+
+    private static final class PendingWriteRequest {
+        private final ChannelHandlerContext ctx;
+        private final ByteBuf msg;
+        private final ChannelPromise promise;
+
+        public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+            this.ctx = ctx;
+            // Reset reader index, last write (failed) attempt moved index to the end
+            msg.resetReaderIndex();
+            this.msg = msg;
+            this.promise = promise;
+        }
+
+        public void pend(final Queue<PendingWriteRequest> pending) {
+            // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+            // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+            // pending.size(), ctx.channel());
+            Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
+                    pending.size(), ctx.channel());
+        }
+    }
+}
index 223f2c7..d0fc43d 100644 (file)
@@ -23,12 +23,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.net.SocketAddress;
 
-import java.nio.channels.WritePendingException;
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.SshClient;
@@ -46,6 +43,7 @@ import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Buffer;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mock;
@@ -59,6 +57,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -351,19 +351,16 @@ public class AsyncSshHandlerTest {
 
         // make first write stop pending
         firstWriteListener.operationComplete(ioWriteFuture);
-        // intercept third listener, this is regular listener for second write to determine success or failure
-        final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
 
         // notify listener for second write that pending has ended
         pendingListener.get().operationComplete(ioWriteFuture);
-        // Notify third listener (regular listener for second write) that second write succeeded
-        afterPendingListener.get().operationComplete(ioWriteFuture);
 
         // verify both write promises successful
         verify(firstWritePromise).setSuccess();
         verify(secondWritePromise).setSuccess();
     }
 
+    @Ignore("Pending queue is not limited")
     @Test
     public void testWritePendingMax() throws Exception {
         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
@@ -389,11 +386,11 @@ public class AsyncSshHandlerTest {
         final ChannelPromise secondWritePromise = getMockedPromise();
         // now make write throw pending exception
         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 1001; i++) {
             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
         }
 
-        verify(ctx).fireChannelInactive();
+        verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
     }
 
     @Test
index 728316a..19bcfe4 100644 (file)
@@ -66,6 +66,7 @@
                             org.eclipse.persistence.jaxb.rs,
                             com.sun.jersey.spi.container.servlet,
                             javax.ws.rs,
+                            javax.ws.rs.ext,
                             javax.ws.rs.core,
                             javax.xml.bind.annotation,
                             javax.xml.bind,
index 7802dbb..5235030 100644 (file)
@@ -15,7 +15,6 @@ import org.codehaus.enunciate.jaxrs.ResponseCode;
 import org.codehaus.enunciate.jaxrs.StatusCodes;
 import org.opendaylight.controller.networkconfig.neutron.INeutronLoadBalancerPoolAware;
 import org.opendaylight.controller.networkconfig.neutron.INeutronLoadBalancerPoolCRUD;
-import org.opendaylight.controller.networkconfig.neutron.INeutronLoadBalancerPoolMemberCRUD;
 import org.opendaylight.controller.networkconfig.neutron.NeutronCRUDInterfaces;
 import org.opendaylight.controller.networkconfig.neutron.NeutronLoadBalancerPool;
 import org.opendaylight.controller.networkconfig.neutron.NeutronLoadBalancerPoolMember;
@@ -62,8 +61,7 @@ import java.util.List;
 
 /**
  * For now, the LB pool member data is maintained with the INeutronLoadBalancerPoolCRUD,
- * although there may be an overlap with INeutronLoadBalancerPoolMemberCRUD's cache.
- * TODO: Consolidate and maintain a single copy
+ * and not duplicated within the INeutronLoadBalancerPoolMemberCRUD's cache.
  */
 
 @Path("/pools")
@@ -393,21 +391,6 @@ public class NeutronLoadBalancerPoolNorthbound {
                 service.neutronLoadBalancerPoolDeleted(singleton);
             }
         }
-
-        /*
-         * remove corresponding members from the member cache too
-         */
-        INeutronLoadBalancerPoolMemberCRUD loadBalancerPoolMemberInterface = NeutronCRUDInterfaces.getINeutronLoadBalancerPoolMemberCRUD(this);
-        if (loadBalancerPoolMemberInterface != null) {
-            List<NeutronLoadBalancerPoolMember> allLoadBalancerPoolMembers = new
-                ArrayList<NeutronLoadBalancerPoolMember>(loadBalancerPoolMemberInterface.getAllNeutronLoadBalancerPoolMembers());
-            Iterator<NeutronLoadBalancerPoolMember> i = allLoadBalancerPoolMembers.iterator();
-            while (i.hasNext()) {
-                NeutronLoadBalancerPoolMember member = i.next();
-                if (member.getPoolID() == loadBalancerPoolUUID)
-                    loadBalancerPoolMemberInterface.removeNeutronLoadBalancerPoolMember(member.getPoolMemberID());
-            }
-        }
         return Response.status(204).build();
     }
 }