Merge "Introduce a mechanism for a Follower to signal it's sync up status"
authorTom Pantelis <tpanteli@brocade.com>
Wed, 4 Mar 2015 01:22:39 +0000 (01:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 4 Mar 2015 01:22:39 +0000 (01:22 +0000)
14 files changed:
features/mdsal/pom.xml
karaf/opendaylight-karaf-empty/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/NormalizedNodeJsonBodyWriter.java

index 35edb286ab49a003389b9e2cf93b10d3123af674..5b5c1b94e09f78cada5baf183648ee30054fa0ab 100644 (file)
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
-
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-clustering-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>cfg</type>
+      <classifier>datastore</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-distributed-datastore</artifactId>
index aa772096cd958307341bbff1e652224adec0aa6f..ae3a03f328692ab88ce33eb4907a7ee1bf3e5c34 100644 (file)
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.opendaylight.controller</groupId>
-    <artifactId>commons.opendaylight</artifactId>
+    <artifactId>karaf-parent</artifactId>
     <version>1.5.0-SNAPSHOT</version>
-    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+    <relativePath/>
   </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.opendaylight.controller</groupId>
   <artifactId>opendaylight-karaf-empty</artifactId>
-  <packaging>pom</packaging>
-
+  <version>1.5.0-SNAPSHOT</version>
+  <name>${project.artifactId}</name>
+  <prerequisites>
+    <maven>3.1.1</maven>
+  </prerequisites>
   <dependencies>
     <dependency>
       <!-- scope is compile so all features (there is only one) are installed
-            into startup.properties and the feature repo itself is not installed -->
+      into startup.properties and the feature repo itself is not installed -->
       <groupId>org.apache.karaf.features</groupId>
       <artifactId>framework</artifactId>
-      <version>${karaf.version}</version>
       <type>kar</type>
     </dependency>
-    <!-- scope is runtime so the feature repo is listed in the features
-      service config file, and features may be installed using the
-      karaf-maven-plugin configuration -->
-    <dependency>
-      <groupId>org.apache.karaf.features</groupId>
-      <artifactId>standard</artifactId>
-      <version>${karaf.version}</version>
-      <classifier>features</classifier>
-      <type>xml</type>
-      <scope>runtime</scope>
-    </dependency>
-
-    <!-- ODL Branding -->
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>karaf.branding</artifactId>
-      <scope>compile</scope>
-    </dependency>
-
-    <!-- Resources needed -->
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>opendaylight-karaf-resources</artifactId>
-      <version>${project.version}</version>
-    </dependency>
   </dependencies>
-
-  <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.eclipse.m2e</groupId>
-          <artifactId>lifecycle-mapping</artifactId>
-          <version>1.0.0</version>
-          <configuration>
-            <lifecycleMappingMetadata>
-              <pluginExecutions>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.apache.felix</groupId>
-                    <artifactId>maven-bundle-plugin</artifactId>
-                    <versionRange>[0,)</versionRange>
-                    <goals>
-                      <goal>cleanVersions</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore></ignore>
-                  </action>
-                </pluginExecution>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.apache.maven.plugins</groupId>
-                    <artifactId>maven-dependency-plugin</artifactId>
-                    <versionRange>[0,)</versionRange>
-                    <goals>
-                      <goal>copy</goal>
-                      <goal>unpack</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore></ignore>
-                  </action>
-                </pluginExecution>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.apache.karaf.tooling</groupId>
-                    <artifactId>karaf-maven-plugin</artifactId>
-                    <versionRange>[0,)</versionRange>
-                    <goals>
-                      <goal>commands-generate-help</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore></ignore>
-                  </action>
-                </pluginExecution>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.fusesource.scalate</groupId>
-                    <artifactId>maven-scalate-plugin</artifactId>
-                    <versionRange>[0,)</versionRange>
-                    <goals>
-                      <goal>sitegen</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore></ignore>
-                  </action>
-                </pluginExecution>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.apache.servicemix.tooling</groupId>
-                    <artifactId>depends-maven-plugin</artifactId>
-                    <versionRange>[0,)</versionRange>
-                    <goals>
-                      <goal>generate-depends-file</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore></ignore>
-                  </action>
-                </pluginExecution>
-              </pluginExecutions>
-            </lifecycleMappingMetadata>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.karaf.tooling</groupId>
-        <artifactId>karaf-maven-plugin</artifactId>
-        <version>${karaf.version}</version>
-        <extensions>true</extensions>
-        <executions>
-          <execution>
-            <id>process-resources</id>
-            <goals>
-              <goal>install-kars</goal>
-            </goals>
-            <phase>process-resources</phase>
-          </execution>
-          <execution>
-            <id>package</id>
-            <goals>
-              <goal>instance-create-archive</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/</excludes>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <version>2.6</version>
-        <executions>
-          <execution>
-            <id>copy</id>
-            <goals>
-              <goal>copy</goal>
-            </goals>
-            <!-- here the phase you need -->
-            <phase>generate-resources</phase>
-            <configuration>
-              <artifactItems>
-                <artifactItem>
-                  <groupId>org.opendaylight.controller</groupId>
-                  <artifactId>karaf.branding</artifactId>
-                  <version>${karaf.branding.version}</version>
-                  <outputDirectory>target/assembly/lib</outputDirectory>
-                  <destFileName>karaf.branding-${branding.version}.jar</destFileName>
-                </artifactItem>
-              </artifactItems>
-            </configuration>
-          </execution>
-          <execution>
-            <id>unpack-karaf-resources</id>
-            <goals>
-              <goal>unpack-dependencies</goal>
-            </goals>
-            <phase>prepare-package</phase>
-            <configuration>
-             <outputDirectory>${project.build.directory}/assembly</outputDirectory>
-             <groupId>org.opendaylight.controller</groupId>
-             <includeArtifactIds>opendaylight-karaf-resources</includeArtifactIds>
-             <excludes>META-INF\/**</excludes>
-             <excludeTransitive>true</excludeTransitive>
-             <ignorePermissions>false</ignorePermissions>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-            <execution>
-                <phase>prepare-package</phase>
-                <goals>
-                    <goal>run</goal>
-                </goals>
-                <configuration>
-                  <tasks>
-                    <chmod perm="755">
-                        <fileset dir="${project.build.directory}/assembly/bin">
-                          <include name="karaf"/>
-                          <include name="instance"/>
-                        </fileset>
-                    </chmod>
-                  </tasks>
-                </configuration>
-            </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
   <scm>
     <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
     <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
index 285be39c0b3286c2abbdaf3c08e09c1f85c0224f..cb1b42aa1b1fb95ced067fc152e0760a3ce0576f 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -147,7 +148,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ReplicatedLogEntry) {
                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
             } else if (message instanceof ApplyLogEntries) {
-                onRecoveredApplyLogEntries((ApplyLogEntries) message);
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
@@ -209,18 +213,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         replicatedLog.append(logEntry);
     }
 
-    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+    private void onRecoveredApplyLogEntries(long toIndex) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+                    persistenceId(), context.getLastApplied() + 1, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
             batchRecoveredLogEntry(replicatedLog.get(i));
         }
 
-        context.setLastApplied(ale.getToIndex());
-        context.setCommitIndex(ale.getToIndex());
+        context.setLastApplied(toIndex);
+        context.setCommitIndex(toIndex);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
@@ -297,14 +301,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
-        } else if (message instanceof ApplyLogEntries){
-            ApplyLogEntries ale = (ApplyLogEntries) message;
+        } else if (message instanceof ApplyJournalEntries){
+            ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
-            persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+            persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
                 @Override
-                public void apply(ApplyLogEntries param) throws Exception {
+                public void apply(ApplyJournalEntries param) throws Exception {
                 }
             });
 
@@ -424,9 +428,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // Apply the state immediately
                             applyState(clientActor, identifier, data);
 
-                            // Send a ApplyLogEntries message so that we write the fact that we applied
+                            // Send a ApplyJournalEntries message so that we write the fact that we applied
                             // the state to durable storage
-                            self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
                             if(!context.isSnapshotCaptureInitiated()){
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java
new file mode 100644 (file)
index 0000000..ca251d2
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long toIndex;
+
+    public ApplyJournalEntries(long toIndex) {
+        this.toIndex = toIndex;
+    }
+
+    public long getToIndex() {
+        return toIndex;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ApplyJournalEntries [toIndex=").append(toIndex).append("]");
+        return builder.toString();
+    }
+}
index c395915a0fb5763ab733774b5a4680c97553afe0..744d0098419576696d289768e10d813ad09c4176 100644 (file)
@@ -18,9 +18,11 @@ import java.io.Serializable;
  * This class is also used as a internal message sent from Behaviour to
  * RaftActor to persist the ApplyLogEntries
  *
+ * @deprecated Deprecated in favor of ApplyJournalEntries whose type for toIndex is long instead of int.
+ *             This class was kept for backwards compatibility with Helium.
  */
+@Deprecated
 public class ApplyLogEntries implements Serializable {
-    private static final long serialVersionUID = 1L;
     private final int toIndex;
 
     public ApplyLogEntries(int toIndex) {
index 0b0b4c7cd642480f92dd600a4f8f10be07977dc4..ef5f11e37aef4fe7490887a27d91caedb0e50c51 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -393,7 +393,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // will be used during recovery
         //in case if the above code throws an error and this message is not sent, it would be fine
         // as the  append entries received later would initiate add this message to the journal
-        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+        actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 56bfc21f23c2047a19d5a49c49861ae1bf6eca5e..c0bdc53c51f27907da2de0e2399d619cee8fdda1 100644 (file)
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -396,10 +397,11 @@ public class RaftActorTest extends AbstractActorTest {
 
             MockAkkaJournal.addToJournal(5, entry2);
             // 2 entries are applied to state besides the 4 entries in snapshot
-            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
             MockAkkaJournal.addToJournal(7, entry3);
             MockAkkaJournal.addToJournal(8, entry4);
 
+
             // kill the actor
             followerActor.tell(PoisonPill.getInstance(), null);
             expectMsgClass(duration("5 seconds"), Terminated.class);
@@ -423,6 +425,46 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("leader-");
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            // Setup the persisted journal with some entries
+            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                    new MockRaftActorContext.MockPayload("zero"));
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                    new MockRaftActorContext.MockPayload("oen"));
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+                    new MockRaftActorContext.MockPayload("two"));
+
+            long seqNr = 1;
+            MockAkkaJournal.addToJournal(seqNr++, entry0);
+            MockAkkaJournal.addToJournal(seqNr++, entry1);
+            MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
+            MockAkkaJournal.addToJournal(seqNr++, entry2);
+
+            int lastAppliedToState = 1;
+            int lastIndex = 2;
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+                            Optional.<ConfigParams>of(config)));
+
+            leaderActor.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+        }};
+    }
+
     /**
      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
      * process recovery messages
@@ -471,7 +513,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 2, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -538,7 +580,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 0, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -641,7 +683,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+    public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 String persistenceId = factory.generateActorId("leader-");
@@ -659,7 +701,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
 
-                mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+                mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
 
index c57fce1cd553d8c41dd786d9c4bb6f33e97791b6..6964db51f273f52ac0591b80cf75edf5997ec68c 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
@@ -929,12 +929,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, leaderActorContext.getCommitIndex());
 
-        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
-                leaderActor, ApplyLogEntries.class);
+        ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyJournalEntries.class);
 
         assertEquals(2, leaderActorContext.getLastApplied());
 
-        assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(2, applyJournalEntries.getToIndex());
 
         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
                 ApplyState.class);
index 85edc07bc5c683a136d0bbb93f534ecc6f2e7103..47864be41161a060c71b35f52ce835dbd0064b61 100644 (file)
@@ -15,14 +15,13 @@ import akka.persistence.PersistentImpl;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.google.common.collect.Maps;
-import scala.concurrent.Future;
-
 import java.util.Map;
 import java.util.concurrent.Callable;
+import scala.concurrent.Future;
 
 public class MockAkkaJournal extends AsyncWriteJournal {
 
-    private static Map<Long, Object> journal = Maps.newHashMap();
+    private static Map<Long, Object> journal = Maps.newLinkedHashMap();
 
     public static void addToJournal(long sequenceNr, Object message) {
         journal.put(sequenceNr, message);
index 4b0651a48eb07f4814bd990386bbe94d32159dfd..bbbc4db5e351f99df3856e807b146f18ac1899d1 100644 (file)
@@ -74,6 +74,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -492,7 +493,7 @@ public class ShardTest extends AbstractActorTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+                new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
index b037627badc3a4d1745c3047fa07b73d1456d67a..f97e32547261727f842c087e1e53ef502d1d40ea 100644 (file)
@@ -68,14 +68,6 @@ public class TransactionRateLimitingCommitCallbackTest {
         commitCallback.success();
 
         verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
-
-        doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
-
-        commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
     }
 
     @Test
@@ -188,7 +180,7 @@ public class TransactionRateLimitingCommitCallbackTest {
 
             @Override
             public void describeTo(Description description) {
-
+                description.appendText("> " + val +" < " + (val+1));
             }
         };
     }
index aac425b3d400ca9ed530230a0359740f4f3db74c..b9972fc0a09f9773db0270e51198209438df010e 100644 (file)
@@ -79,6 +79,7 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
         final ExecutorService executor = Executors.newCachedThreadPool();
         final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
 
+        disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
         disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
         disruptor.start();
 
index dfae165d30c3fba4059b0c4c4aa5fb9308b825f6..568ebde0d393aca1b1d66c660d2b8f801f1dad8b 100644 (file)
@@ -91,6 +91,8 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
         mountInstance.close();
         datastoreAdapter.close();
         datastoreAdapter = null;
+        topologyDatastoreAdapter.close();
+        topologyDatastoreAdapter = null;
     }
 
     static final class MountInstance implements AutoCloseable {
index 03aa31680b97813ef8f6e5a80ee93b5a0d52e6fc..589f9cd662bf7f9ea8d456fba38ba7925595d84e 100644 (file)
@@ -8,13 +8,13 @@
 package org.opendaylight.controller.sal.rest.impl;
 
 import com.google.common.base.Charsets;
+import com.google.gson.stream.JsonWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
 import java.net.URI;
-import java.util.Iterator;
 import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
@@ -34,7 +34,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -60,45 +62,66 @@ public class NormalizedNodeJsonBodyWriter implements MessageBodyWriter<Normalize
             final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
                     throws IOException, WebApplicationException {
         NormalizedNode<?, ?> data = t.getData();
-        final InstanceIdentifierContext context = t.getInstanceIdentifierContext();
-        final DataSchemaNode schema = context.getSchemaNode();
-        SchemaPath path = context.getSchemaNode().getPath();
-        final OutputStreamWriter outputWriter = new OutputStreamWriter(entityStream, Charsets.UTF_8);
         if (data == null) {
             throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
         }
 
+        final InstanceIdentifierContext context = t.getInstanceIdentifierContext();
+
+        SchemaPath path = context.getSchemaNode().getPath();
         boolean isDataRoot = false;
-        URI initialNs = null;
         if (SchemaPath.ROOT.equals(path)) {
             isDataRoot = true;
         } else {
             path = path.getParent();
             // FIXME: Add proper handling of reading root.
         }
-        if(!schema.isAugmenting() && !(schema instanceof SchemaContext)) {
-            initialNs = schema.getQName().getNamespace();
-        }
-        final NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
-        final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
+
+        JsonWriter jsonWriter = createJsonWriter(entityStream);
+        NormalizedNodeWriter nnWriter = createNormalizedNodeWriter(context,path,jsonWriter);
+
+        jsonWriter.beginObject();
         if(isDataRoot) {
-            writeDataRoot(outputWriter,nnWriter,(ContainerNode) data);
+            writeDataRoot(nnWriter,(ContainerNode) data);
         } else {
             if(data instanceof MapEntryNode) {
                 data = ImmutableNodes.mapNodeBuilder(data.getNodeType()).withChild(((MapEntryNode) data)).build();
             }
             nnWriter.write(data);
         }
+
         nnWriter.flush();
-        outputWriter.flush();
+        jsonWriter.endObject();
+        jsonWriter.flush();
+    }
+
+    private NormalizedNodeWriter createNormalizedNodeWriter(InstanceIdentifierContext context, SchemaPath path, JsonWriter jsonWriter) {
+
+        final DataSchemaNode schema = context.getSchemaNode();
+        final JSONCodecFactory codecs = getCodecFactory(context);
+
+        URI initialNs = null;
+        if(!schema.isAugmenting() && !(schema instanceof SchemaContext)) {
+            initialNs = schema.getQName().getNamespace();
+        }
+        final NormalizedNodeStreamWriter streamWriter = JSONNormalizedNodeStreamWriter.createNestedWriter(codecs,path,initialNs,jsonWriter);
+        return NormalizedNodeWriter.forStreamWriter(streamWriter);
+    }
+
+    private JsonWriter createJsonWriter(OutputStream entityStream) {
+        // FIXME BUG-2153: Add pretty print support
+        return JsonWriterFactory.createJsonWriter(new OutputStreamWriter(entityStream, Charsets.UTF_8));
+
+    }
+
+    private JSONCodecFactory getCodecFactory(InstanceIdentifierContext context) {
+        // TODO: Performance: Cache JSON Codec factory and schema context
+        return JSONCodecFactory.create(context.getSchemaContext());
     }
 
-    private void writeDataRoot(final OutputStreamWriter outputWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) throws IOException {
-        final Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
-        while(iterator.hasNext()) {
-            final DataContainerChild<? extends PathArgument, ?> child = iterator.next();
+    private void writeDataRoot(final NormalizedNodeWriter nnWriter, final ContainerNode data) throws IOException {
+        for(DataContainerChild<? extends PathArgument, ?> child : data.getValue()) {
             nnWriter.write(child);
-            nnWriter.flush();
         }
     }