Merge "Bug 1817 - Have the md-sal-broker feature reference the yangmodels feature...
authorEd Warnicke <eaw@cisco.com>
Wed, 17 Sep 2014 20:59:53 +0000 (20:59 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 17 Sep 2014 20:59:53 +0000 (20:59 +0000)
36 files changed:
features/mdsal/pom.xml
features/netconf/src/main/resources/features.xml
opendaylight/archetypes/opendaylight-configfile-archetype/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWithExecutorServiceBenchmark.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FromSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/MDFlowMapping.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/ToSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/FromSalConversionsUtilsTest.java [new file with mode: 0644]
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/TestFromSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/TestToSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/ToSalConversionsUtilsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf
opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfDocumentedExceptionMapper.java
opendaylight/netconf/netconf-it/pom.xml
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-it/src/test/resources/logback-test.xml
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/packet/ICMP.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/packet/IPv4.java
opendaylight/sal/api/src/test/java/org/opendaylight/controller/sal/packet/ICMPTest.java
opendaylight/sal/api/src/test/java/org/opendaylight/controller/sal/packet/IPv4Test.java

index 960dfb37a176bbe0aeed7f03cc1019aff1b5994f..f45f680c3069032601bab3ef03196cc06150ec7d 100644 (file)
@@ -13,6 +13,7 @@
 
   <properties>
     <features.file>features.xml</features.file>
+    <org.json.version>20131018</org.json.version>
   </properties>
 
   <dependencies>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller.samples</groupId>
+      <artifactId>clustering-it-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>testmoduleshardconf</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller.samples</groupId>
+      <artifactId>clustering-it-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>testmoduleconf</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-rest-docgen</artifactId>
index 743dae663e5f871e4d50f3e57ce86593e0742bcb..444f20865b565d48d1dd2e55d6b3362db051cf76 100644 (file)
@@ -11,8 +11,6 @@
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-impl</feature>
-    <feature version='${project.version}'>odl-netconf-tcp</feature>
-    <feature version='${project.version}'>odl-netconf-ssh</feature>
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <feature version='${project.version}'>odl-netconf-client</feature>
index 38c86164e9b88067847f0d8012965c0f3aba154c..56342218a0b8205555961832f60d3fca4846a809 100644 (file)
   <distributionManagement>
     <repository>
       <id>opendaylight-release</id>
-      <url>http://nexus.opendaylight.org/content/repositories/opendaylight.release/</url>
+      <url>${nexusproxy}/repositories/opendaylight.release/</url>
     </repository>
     <snapshotRepository>
       <id>opendaylight-snapshot</id>
-      <url>http://nexus.opendaylight.org/content/repositories/opendaylight.snapshot/</url>
+      <url>${nexusproxy}/repositories/opendaylight.snapshot/</url>
     </snapshotRepository>
     <site>
       <id>website</id>
-      <url>dav:http://nexus.opendaylight.org/content/sites/site/sal-parent</url>
+      <url>dav:${nexusproxy}/sites/site/sal-parent</url>
     </site>
   </distributionManagement>
 </project>
index 4240db939a604c44424ffe55323f8f52ab114a2d..e3ffbf356aeecb541b76da89a4c40f450016900d 100644 (file)
   <repositories>
 
     <!-- OpenDayLight Repo Mirror -->
+    <!-- NOTE: URLs need to be hardcoded in the repository section because we have
+         parent poms that do NOT exist in this project and thus need to be pulled
+         down from the repository. To override these URLs you should use the
+         mirror section in your local settings.xml file. -->
     <repository>
       <releases>
         <enabled>true</enabled>
       <url>http://nexus.opendaylight.org/content/repositories/opendaylight.snapshot/</url>
     </pluginRepository>
   </pluginRepositories>
+
+  <!-- distribution management only runs when you run mvn deploy
+       which is if you are deploying compiled artifacts to a
+       maven repository. In that case logic dictacts that you already
+       compiled and thus already have the necessary parent pom files
+       that do not exist in this project pulled down to your local
+       .m2. That way the variables can be resolved and artifacts can
+       be uploaded when running mvn deploy. -->
   <distributionManagement>
     <!-- OpenDayLight Released artifact -->
     <repository>
       <id>opendaylight-release</id>
-      <url>http://nexus.opendaylight.org/content/repositories/opendaylight.release/</url>
+      <url>${nexusproxy}/repositories/opendaylight.release/</url>
     </repository>
     <!-- OpenDayLight Snapshot artifact -->
     <snapshotRepository>
       <id>opendaylight-snapshot</id>
-      <url>http://nexus.opendaylight.org/content/repositories/opendaylight.snapshot/</url>
+      <url>${nexusproxy}/repositories/opendaylight.snapshot/</url>
     </snapshotRepository>
     <!-- Site deployment -->
     <site>
index 4f240447b44171475a3049db94a9e4d65705a1f3..947c65f6bd175a46f8c633d17d7bb305558438a7 100755 (executable)
 # export KARAF_ETC  # Karaf etc  folder
 # export KARAF_OPTS # Additional available Karaf options
 # export KARAF_DEBUG # Enable debug mode
-if [ "x$JAVA_MAX_PERM_MEM" == "x" ]; then
+if [ "x$JAVA_MAX_PERM_MEM" = "x" ]; then
     export JAVA_MAX_PERM_MEM="512m"
 fi
-if [ "x$JAVA_MAX_MEM" == "x" ]; then
+if [ "x$JAVA_MAX_MEM" = "x" ]; then
     export JAVA_MAX_MEM="2048m"
 fi
 
index e0e2759b370e00d68602a647b1258d5fa6e0aeb5..cdb65420135d1f71ebe1e66eb67dec8efda762dd 100644 (file)
@@ -127,3 +127,9 @@ java.util.logging.config.file=configuration/tomcat-logging.properties
 #Hosttracker hostsdb key scheme setting
 hosttracker.keyscheme=IP
 
+# LISP Flow Mapping configuration
+# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings
+lisp.mappingOverwrite = true
+# Enable the Solicit-Map-Request (SMR) mechanism
+lisp.smr = false
+
index 4b9d66f4f226c880f2f1bff53adcd4c55162302e..77a4966ec3c1a063d650e0c240018e70bd10af7d 100644 (file)
@@ -7,19 +7,21 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.benchmark;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 
 /**
  * Benchmark for testing of performance of write operations for InMemoryDataStore. The instance
@@ -41,14 +43,15 @@ public class InMemoryDataStoreWithExecutorServiceBenchmark extends AbstractInMem
     private static final int MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
     private static final int MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE = 5000;
 
+    @Override
     @Setup(Level.Trial)
     public void setUp() throws Exception {
         final String name = "DS_BENCHMARK";
         final ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
             MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, name + "-DCL");
 
-        final ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
-            MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE, "DOMStore-" + name );
+        final ListeningExecutorService domStoreExecutor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(
+            MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE, "DOMStore-" + name ));
 
         domStore = new InMemoryDOMDataStore(name, domStoreExecutor,
             dataChangeListenerExecutor);
@@ -57,6 +60,7 @@ public class InMemoryDataStoreWithExecutorServiceBenchmark extends AbstractInMem
         initTestNode();
     }
 
+    @Override
     @TearDown
     public void tearDown() {
         schemaContext = null;
index 1b648dc98c36c0c6e05bfce6740b294dd730e8b8..ecf1a94c18c8123dbcffe5c9d398ce69566f356d 100644 (file)
@@ -61,10 +61,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026
 
 import com.google.common.net.InetAddresses;
 
-public class FromSalConversionsUtils {
+/**
+ * MD-SAL to AD-SAL conversions collection
+ */
+public final class FromSalConversionsUtils {
 
-    private FromSalConversionsUtils() {
+    /** http://en.wikipedia.org/wiki/IPv4#Packet_structure (end of octet number 1, bit 14.+15.) */
+    public static final int ENC_FIELD_BIT_SIZE = 2;
 
+    private FromSalConversionsUtils() {
+        throw new IllegalAccessError("forcing no instance for factory");
     }
 
     @SuppressWarnings("unused")
@@ -469,5 +475,12 @@ public class FromSalConversionsUtils {
         return true;
     }
 
+    /**
+     * @param nwDscp NW-DSCP
+     * @return shifted to NW-TOS (with empty ECN part)
+     */
+    public static int dscpToTos(int nwDscp) {
+        return (short) (nwDscp << ENC_FIELD_BIT_SIZE);
+    }
 
 }
index 5837e35b3a65b7bb4a7b8fc7c0ae2517cbe07db6..00511bc74449adfac7c1d1f3bb0cc968ecb95162 100644 (file)
@@ -315,7 +315,7 @@ public final class MDFlowMapping {
 
     private static SetNwTosActionCase _toAction(final SetNwTos sourceAction) {
         return new SetNwTosActionCaseBuilder()
-        .setSetNwTosAction(new SetNwTosActionBuilder().setTos(sourceAction.getNwTos()).build())
+        .setSetNwTosAction(new SetNwTosActionBuilder().setTos(FromSalConversionsUtils.dscpToTos(sourceAction.getNwTos())).build())
         .build();
     }
 
index 28dd57c3b7986fecabae2c2fa9749e8d58fbd36e..dcc1a4660b5b71690419f377c84852f192b3c0dc 100644 (file)
@@ -128,7 +128,7 @@ public class ToSalConversionsUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ToSalConversionsUtils.class);
 
     private ToSalConversionsUtils() {
-
+        throw new IllegalAccessError("forcing no instance for factory");
     }
 
     public static Flow toFlow(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow source, Node node) {
@@ -287,7 +287,7 @@ public class ToSalConversionsUtils {
             } else if (sourceAction instanceof SetNwTosActionCase) {
                 Integer tos = ((SetNwTosActionCase) sourceAction).getSetNwTosAction().getTos();
                 if (tos != null) {
-                    targetAction.add(new SetNwTos(tos));
+                    targetAction.add(new SetNwTos(ToSalConversionsUtils.tosToNwDscp(tos)));
                 }
             } else if (sourceAction instanceof SetTpDstActionCase) {
                 PortNumber port = ((SetTpDstActionCase) sourceAction).getSetTpDstAction().getPort();
@@ -643,4 +643,12 @@ public class ToSalConversionsUtils {
 
         return mac;
     }
+
+    /**
+     * @param nwTos NW-TOS
+     * @return shifted to NW-DSCP
+     */
+    public static int tosToNwDscp(int nwTos) {
+        return (short) (nwTos >>> FromSalConversionsUtils.ENC_FIELD_BIT_SIZE);
+    }
 }
diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/FromSalConversionsUtilsTest.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/FromSalConversionsUtilsTest.java
new file mode 100644 (file)
index 0000000..b09e816
--- /dev/null
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.compatibility.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.sal.compatibility.FromSalConversionsUtils;
+
+/**
+ * test of {@link FromSalConversionsUtils}
+ */
+public class FromSalConversionsUtilsTest {
+
+    /**
+     * Test method for {@link org.opendaylight.controller.sal.compatibility.FromSalConversionsUtils#dscpToTos(int)}.
+     */
+    @Test
+    public void testDscpToTos() {
+        Assert.assertEquals(0, FromSalConversionsUtils.dscpToTos(0));
+        Assert.assertEquals(4, FromSalConversionsUtils.dscpToTos(1));
+        Assert.assertEquals(252, FromSalConversionsUtils.dscpToTos(63));
+        Assert.assertEquals(256, FromSalConversionsUtils.dscpToTos(64));
+        Assert.assertEquals(-4, FromSalConversionsUtils.dscpToTos(-1));
+    }
+
+}
index 9f787b7e391010cee640d6b0582c4e0447ded4c2..98df90112deedfaa0815031802242bfae503784a 100644 (file)
@@ -293,7 +293,7 @@ public class TestFromSalConversionsUtils {
                     }
                     assertTrue("Ipv4 address wasn't found.", ipv4AddressFound);
                 } else if (innerAction instanceof SetNwTosActionCase) {
-                    assertEquals("Wrong TOS in SetNwTosAction.", (Integer) 63, ((SetNwTosActionCase) innerAction).getSetNwTosAction().getTos());
+                    assertEquals("Wrong TOS in SetNwTosAction.", (Integer) 252, ((SetNwTosActionCase) innerAction).getSetNwTosAction().getTos());
                 } else if (innerAction instanceof SetNwDstActionCase) {
                     Address address = ((SetNwDstActionCase) innerAction).getSetNwDstAction().getAddress();
                     boolean ipv4AddressFound = false;
index 60b77394c1f15e8055d93afa259862dccd33c5a3..16d0bb424d02746921d16f5c321915232f5219c8 100644 (file)
@@ -499,7 +499,7 @@ public class TestToSalConversionsUtils {
 
     private void prepareActionSetNwTos(SetNwTosActionCaseBuilder wrapper) {
         SetNwTosActionBuilder setNwTosActionBuilder = new SetNwTosActionBuilder();
-        setNwTosActionBuilder.setTos(63);
+        setNwTosActionBuilder.setTos(252);
         wrapper.setSetNwTosAction(setNwTosActionBuilder.build());
     }
 
diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/ToSalConversionsUtilsTest.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/ToSalConversionsUtilsTest.java
new file mode 100644 (file)
index 0000000..aa25c18
--- /dev/null
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.compatibility.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils;
+
+/**
+ * test of {@link ToSalConversionsUtils}
+ */
+public class ToSalConversionsUtilsTest {
+
+    /**
+     * Test method for {@link org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils#tosToNwDscp(int)}.
+     */
+    @Test
+    public void testTosToNwDscp() {
+        Assert.assertEquals(0, ToSalConversionsUtils.tosToNwDscp(0));
+        Assert.assertEquals(0, ToSalConversionsUtils.tosToNwDscp(1));
+        Assert.assertEquals(1, ToSalConversionsUtils.tosToNwDscp(4));
+        Assert.assertEquals(63, ToSalConversionsUtils.tosToNwDscp(252));
+        Assert.assertEquals(63, ToSalConversionsUtils.tosToNwDscp(253));
+        Assert.assertEquals(-1, ToSalConversionsUtils.tosToNwDscp(-1));
+    }
+}
index 978ea91089dbcb1a530c9d66f6878ffa06579e2d..cb51a8951a54f158bac9b7bf1cab769662568fc3 100644 (file)
@@ -7,7 +7,6 @@ import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
@@ -196,11 +195,6 @@ public class TestDriver {
 
         actorSystem.stop(actorRef);
         actorRefs.remove(actorName);
-
-        for (ActorRef actor : actorRefs.values()) {
-            actor.tell(new RemoveRaftPeer(actorName), null);
-        }
-
         allPeers.remove(actorName);
     }
 
@@ -209,11 +203,6 @@ public class TestDriver {
         allPeers.put(actorName, address);
 
         ActorRef exampleActor = createExampleActor(actorName);
-
-        for (ActorRef actor : actorRefs.values()) {
-            actor.tell(new AddRaftPeer(actorName, address), null);
-        }
-
         actorRefs.put(actorName, exampleActor);
 
         addClientsToNode(actorName, 1);
index 91bbeeca504b607147e39927dae1481ee9c4df6e..8270f2949a67cc9fc00f5180dce41872ca6a8a47 100644 (file)
@@ -96,7 +96,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private RaftActorContext context;
+    protected RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -134,6 +134,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
+            context.setCommitIndex(snapshot.getLastAppliedIndex());
 
             LOG.info("Applied snapshot to replicatedLog. " +
                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -152,21 +153,22 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(null, "recovery", logEntry.getData());
             context.setLastApplied(logEntry.getIndex());
             context.setCommitIndex(logEntry.getIndex());
+
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
         } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                ((UpdateElectionTerm) message).getVotedFor());
+
         } else if (message instanceof RecoveryCompleted) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "RecoveryCompleted - Switching actor to Follower - " +
-                        "Persistence Id =  " + persistenceId() +
-                        " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                        "journal-size={}",
-                    replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                    replicatedLog.snapshotTerm, replicatedLog.size()
-                );
-            }
+            LOG.info(
+                "RecoveryCompleted - Switching actor to Follower - " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "journal-size={}",
+                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
             onStateChanged();
         }
index 12123db12995061901a39a264c79f0237d78d00a..9b099c2abac8223529b750c6ea906925105ec487 100644 (file)
@@ -2,18 +2,24 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
+import static junit.framework.Assert.assertTrue;
 import static junit.framework.TestCase.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -21,11 +27,21 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
+        boolean applySnapshotCalled = false;
+
         public MockRaftActor(String id,
             Map<String, String> peerAddresses) {
             super(id, peerAddresses);
         }
 
+        public RaftActorContext getRaftActorContext() {
+            return context;
+        }
+
+        public boolean isApplySnapshotCalled() {
+            return applySnapshotCalled;
+        }
+
         public static Props props(final String id, final Map<String, String> peerAddresses){
             return Props.create(new Creator<MockRaftActor>(){
 
@@ -45,7 +61,7 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
-            throw new UnsupportedOperationException("applySnapshot");
+           applySnapshotCalled = true;
         }
 
         @Override protected void onStateChanged() {
@@ -134,5 +150,56 @@ public class RaftActorTest extends AbstractActorTest {
         kit.findLeader(kit.getRaftActor().path().toString());
     }
 
+    @Test
+    public void testActorRecovery() {
+        new JavaTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    String persistenceId = "follower10";
+
+                    ActorRef followerActor = getSystem().actorOf(
+                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
+
+
+                    List<ReplicatedLogEntry> entries = new ArrayList<>();
+                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
+                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
+                    entries.add(entry1);
+                    entries.add(entry2);
+
+                    int lastApplied = 3;
+                    int lastIndex = 5;
+                    Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
+                    MockSnapshotStore.setMockSnapshot(snapshot);
+                    MockSnapshotStore.setPersistenceId(persistenceId);
+
+                    followerActor.tell(PoisonPill.getInstance(), null);
+                    try {
+                        // give some time for actor to die
+                        Thread.sleep(200);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+                    try {
+                        //give some time for snapshot offer to get called.
+                        Thread.sleep(200);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+                    assertEquals(entries.size(), context.getReplicatedLog().size());
+                    assertEquals(lastApplied, context.getLastApplied());
+                    assertEquals(lastApplied, context.getCommitIndex());
+                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
+                }
+
+            };
+        }};
+
+    }
+
 
 }
index 227d1effa7e9b8b3bb65932fe8c25b1a2eecdbf5..fd4a75a22f735c06d2ab8042eb13d6e92de572a0 100644 (file)
@@ -3,6 +3,8 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.protobuf.ByteString;
 import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -10,19 +12,35 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
@@ -34,8 +52,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         return new Follower(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), followerActor);
+    @Override protected  RaftActorContext createActorContext() {
+        return createActorContext(followerActor);
+    }
+
+    protected  RaftActorContext createActorContext(ActorRef actorRef){
+        return new MockRaftActorContext("test", getSystem(), actorRef);
     }
 
     @Test
@@ -158,13 +180,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
-                        new MockRaftActorContext.MockPayload("foo"))
+                        (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
+                                new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
@@ -409,4 +432,148 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    /**
+     * This test verifies that when InstallSnapshot is received by
+     * the follower its applied correctly.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(
+                MessageCollectorActor.class));
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext(getRef());
+
+            Follower follower = (Follower)createBehavior(context);
+
+            HashMap<String, String> followerSnapshot = new HashMap<>();
+            followerSnapshot.put("1", "A");
+            followerSnapshot.put("2", "B");
+            followerSnapshot.put("3", "C");
+
+            ByteString bsSnapshot  = toByteString(followerSnapshot);
+            ByteString chunkData = ByteString.EMPTY;
+            int offset = 0;
+            int snapshotLength = bsSnapshot.size();
+            int i = 1;
+
+            do {
+                chunkData = getNextChunk(bsSnapshot, offset);
+                final InstallSnapshot installSnapshot =
+                    new InstallSnapshot(1, "leader-1", i, 1,
+                        chunkData, i, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+                offset = offset + 50;
+                i++;
+            } while ((offset+50) < snapshotLength);
+
+            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+            follower.handleMessage(leaderActor, installSnapshot3);
+
+            String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
+                @Override
+                protected String match(Object o) throws Exception {
+                    if (o instanceof ApplySnapshot) {
+                        ApplySnapshot as = (ApplySnapshot)o;
+                        if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
+                            return "applySnapshot-lastIndex-mismatch";
+                        }
+                        if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
+                            return "applySnapshot-lastAppliedTerm-mismatch";
+                        }
+                        if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
+                            return "applySnapshot-lastAppliedIndex-mismatch";
+                        }
+                        if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
+                            return "applySnapshot-lastTerm-mismatch";
+                        }
+                        return "applySnapshot";
+                    }
+
+                    return "ignoreCase";
+                }
+            }.get();
+
+            String applySnapshotMatch = "";
+            for (String reply: matches) {
+                if (reply.startsWith("applySnapshot")) {
+                    applySnapshotMatch = reply;
+                }
+            }
+
+            assertEquals("applySnapshot", applySnapshotMatch);
+
+            Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+            assertNotNull(messages);
+            assertTrue(messages instanceof List);
+            List<Object> listMessages = (List<Object>) messages;
+
+            int installSnapshotReplyReceivedCount = 0;
+            for (Object message: listMessages) {
+                if (message instanceof InstallSnapshotReply) {
+                    ++installSnapshotReplyReceivedCount;
+                }
+            }
+
+            assertEquals(3, installSnapshotReplyReceivedCount);
+
+        }};
+    }
+
+    public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = ask(actor, message, operationTimeout);
+
+        try {
+            return Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    public ByteString getNextChunk (ByteString bs, int offset){
+        int snapshotLength = bs.size();
+        int start = offset;
+        int size = 50;
+        if (50 > snapshotLength) {
+            size = snapshotLength;
+        } else {
+            if ((start + 50) > snapshotLength) {
+                size = snapshotLength - start;
+            }
+        }
+        return bs.substring(start, start + size);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
new file mode 100644 (file)
index 0000000..88eecfe
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.actor.UntypedActor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class MessageCollectorActor extends UntypedActor {
+    private List<Object> messages = new ArrayList<>();
+
+    @Override public void onReceive(Object message) throws Exception {
+        if(message instanceof String){
+            if("get-all-messages".equals(message)){
+                getSender().tell(messages, getSelf());
+            }
+        } else {
+            messages.add(message);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java
new file mode 100644 (file)
index 0000000..d70bf92
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import scala.concurrent.Future;
+
+
+public class MockSnapshotStore  extends SnapshotStore {
+
+    private static Snapshot mockSnapshot;
+    private static String persistenceId;
+
+    public static void setMockSnapshot(Snapshot s) {
+        mockSnapshot = s;
+    }
+
+    public static void setPersistenceId(String pId) {
+        persistenceId = pId;
+    }
+
+    @Override
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) {
+        if (mockSnapshot == null) {
+            return Futures.successful(Option.<SelectedSnapshot>none());
+        }
+
+        SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345);
+        SelectedSnapshot selectedSnapshot =
+            new SelectedSnapshot(smd, mockSnapshot);
+        return Futures.successful(Option.some(selectedSnapshot));
+    }
+
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        return null;
+    }
+
+    @Override
+    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+
+    }
+
+    @Override
+    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+
+    }
+
+    @Override
+    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception {
+
+    }
+}
index 2b753004c48265620628fdbc58a1e41a96a65c51..6b2cc2203844198bc546ab509695d3c134a35a8b 100644 (file)
@@ -1,4 +1,6 @@
 akka {
+    persistence.snapshot-store.plugin = "mock-snapshot-store"
+
     loglevel = "DEBUG"
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
@@ -19,3 +21,10 @@ akka {
         }
     }
 }
+
+mock-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
index 58677103c2df020fd7ac3fdaaa38353a59a328a4..3de49ae296f391bfa9b33afb3524a3ed4e0cc3ae 100644 (file)
@@ -44,25 +44,47 @@ public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
         this.executor = Preconditions.checkNotNull(executor);
     }
 
+    private static ThreadExecutorStatsMXBeanImpl createInternal(final Executor executor,
+            final String mBeanName, final String mBeanType, final String mBeanCategory) {
+        if (executor instanceof ThreadPoolExecutor) {
+            final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl(
+                    (ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory);
+            return ret;
+        }
+
+        LOG.info("Executor {} is not supported", executor);
+        return null;
+    }
+
     /**
-     * Create a new bean for the statistics, which is already registered.
+     * Creates a new bean if the backing executor is a ThreadPoolExecutor and registers it.
      *
-     * @param executor
-     * @param mBeanName
-     * @param mBeanType
-     * @param mBeanCategory
-     * @return
+     * @param executor the backing {@link Executor}
+     * @param mBeanName Used as the <code>name</code> property in the bean's ObjectName.
+     * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+     * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
+     * @return a registered ThreadExecutorStatsMXBeanImpl instance if the backing executor
+     *         is a ThreadPoolExecutor, otherwise null.
      */
     public static ThreadExecutorStatsMXBeanImpl create(final Executor executor, final String mBeanName,
             final String mBeanType, @Nullable final String mBeanCategory) {
-        if (executor instanceof ThreadPoolExecutor) {
-            final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl((ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory);
+        ThreadExecutorStatsMXBeanImpl ret = createInternal(executor, mBeanName, mBeanType, mBeanCategory);
+        if(ret != null) {
             ret.registerMBean();
-            return ret;
         }
 
-        LOG.info("Executor {} is not supported", executor);
-        return null;
+        return ret;
+    }
+
+    /**
+     * Creates a new bean if the backing executor is a ThreadPoolExecutor.
+     *
+     * @param executor the backing {@link Executor}
+     * @return a ThreadExecutorStatsMXBeanImpl instance if the backing executor
+     *         is a ThreadPoolExecutor, otherwise null.
+     */
+    public static ThreadExecutorStatsMXBeanImpl create(final Executor executor) {
+        return createInternal(executor, "", "", null);
     }
 
     @Override
index 74a91d08cf4373918f069cc3cae44b665c146a9d..0959c2a95949a6332fd66859f0796e103746c5d8 100644 (file)
@@ -74,16 +74,14 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     }
 
     public void setDataStoreExecutor(ExecutorService dsExecutor) {
-        this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor,
-                "notification-executor", getMBeanType(), getMBeanCategory());
+        this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor);
     }
 
     public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
         this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
                 "notification-manager", getMBeanType(), getMBeanCategory());
 
-        this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
-                "data-store-executor", getMBeanType(), getMBeanCategory());
+        this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor());
     }
 
     @Override
@@ -230,7 +228,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     @Override
     public ThreadExecutorStats getDataStoreExecutorStats() {
-        return dataStoreExecutorStatsBean.toThreadExecutorStats();
+        return dataStoreExecutorStatsBean == null ? null :
+                                        dataStoreExecutorStatsBean.toThreadExecutorStats();
     }
 
     @Override
index 5fe9866b12ed0ec6e2a669de4cc7bd3b9957b641..25ddbf5df25dc065ba11bcc64cdf059cfbacb14c 100644 (file)
@@ -11,10 +11,8 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
-
 import java.util.Collection;
 import java.util.Map.Entry;
-
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
@@ -120,7 +118,9 @@ final class ResolveDataChangeEventsTask {
             Preconditions.checkArgument(node.getDataAfter().isPresent(),
                     "Modification at {} has type {} but no after-data", state.getPath(), node.getModificationType());
             if (!node.getDataBefore().isPresent()) {
-                resolveCreateEvent(state, node.getDataAfter().get());
+                @SuppressWarnings({ "unchecked", "rawtypes" })
+                final NormalizedNode<PathArgument, ?> afterNode = (NormalizedNode)node.getDataAfter().get();
+                resolveSameEventRecursivelly(state, afterNode, DOMImmutableDataChangeEvent.getCreateEventFactory());
                 return true;
             }
 
@@ -128,7 +128,10 @@ final class ResolveDataChangeEventsTask {
         case DELETE:
             Preconditions.checkArgument(node.getDataBefore().isPresent(),
                     "Modification at {} has type {} but no before-data", state.getPath(), node.getModificationType());
-            resolveDeleteEvent(state, node.getDataBefore().get());
+
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            final NormalizedNode<PathArgument, ?> beforeNode = (NormalizedNode)node.getDataBefore().get();
+            resolveSameEventRecursivelly(state, beforeNode, DOMImmutableDataChangeEvent.getRemoveEventFactory());
             return true;
         case UNMODIFIED:
             return false;
@@ -223,26 +226,6 @@ final class ResolveDataChangeEventsTask {
         return true;
     }
 
-    /**
-     * Resolves create events deep down the interest listener tree.
-     *
-     * @param path
-     * @param listeners
-     * @param afterState
-     * @return
-     */
-    private void resolveCreateEvent(final ResolveDataChangeState state, final NormalizedNode<?, ?> afterState) {
-        @SuppressWarnings({ "unchecked", "rawtypes" })
-        final NormalizedNode<PathArgument, ?> node = (NormalizedNode) afterState;
-        resolveSameEventRecursivelly(state, node, DOMImmutableDataChangeEvent.getCreateEventFactory());
-    }
-
-    private void resolveDeleteEvent(final ResolveDataChangeState state, final NormalizedNode<?, ?> beforeState) {
-        @SuppressWarnings({ "unchecked", "rawtypes" })
-        final NormalizedNode<PathArgument, ?> node = (NormalizedNode) beforeState;
-        resolveSameEventRecursivelly(state, node, DOMImmutableDataChangeEvent.getRemoveEventFactory());
-    }
-
     private void resolveSameEventRecursivelly(final ResolveDataChangeState state,
             final NormalizedNode<PathArgument, ?> node, final SimpleEventFactory eventFactory) {
         if (!state.needsProcessing()) {
@@ -277,6 +260,11 @@ final class ResolveDataChangeEventsTask {
         Preconditions.checkArgument(modification.getDataBefore().isPresent(), "Subtree change with before-data not present at path %s", state.getPath());
         Preconditions.checkArgument(modification.getDataAfter().isPresent(), "Subtree change with after-data not present at path %s", state.getPath());
 
+        if (!state.needsProcessing()) {
+            LOG.trace("Not processing modified subtree {}", state.getPath());
+            return true;
+        }
+
         DataChangeScope scope = null;
         for (DataTreeCandidateNode childMod : modification.getChildNodes()) {
             final ResolveDataChangeState childState = state.child(childMod.getIdentifier());
index 2642116927cde25de425d8de95884cb5d0ae3b57..09e178f5ceaa4e4709efa51dc9267a41f65b2d3f 100644 (file)
@@ -8,11 +8,10 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-
+import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
-
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -119,10 +118,14 @@ public final class NetconfSessionCapabilities {
         return fromStrings(session.getServerCapabilities());
     }
 
-    private static final QName cachedQName(String namespace, String revision, String moduleName) {
+    private static QName cachedQName(final String namespace, final String revision, final String moduleName) {
         return QName.cachedReference(QName.create(namespace, revision, moduleName));
     }
 
+    private static QName cachedQName(final String namespace, final String moduleName) {
+        return QName.cachedReference(QName.create(URI.create(namespace), null, moduleName).withoutRevision());
+    }
+
     public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
         final Set<QName> moduleBasedCaps = new HashSet<>();
         final Set<String> nonModuleCaps = Sets.newHashSet(capabilities);
@@ -142,8 +145,7 @@ public final class NetconfSessionCapabilities {
 
             String revision = REVISION_PARAM.from(queryParams);
             if (revision != null) {
-                moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
-                nonModuleCaps.remove(capability);
+                addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, revision, moduleName));
                 continue;
             }
 
@@ -151,21 +153,29 @@ public final class NetconfSessionCapabilities {
              * We have seen devices which mis-escape revision, but the revision may not
              * even be there. First check if there is a substring that matches revision.
              */
-            if (!Iterables.any(queryParams, CONTAINS_REVISION)) {
+            if (Iterables.any(queryParams, CONTAINS_REVISION)) {
+
+                LOG.debug("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+                revision = BROKEN_REVISON_PARAM.from(queryParams);
+                if (revision == null) {
+                    LOG.warn("Netconf device returned revision incorrectly escaped for {}, ignoring it", capability);
+                    addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName));
+                } else {
+                    addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, revision, moduleName));
+                }
                 continue;
             }
 
-            LOG.debug("Netconf device was not reporting revision correctly, trying to get amp;revision=");
-            revision = BROKEN_REVISON_PARAM.from(queryParams);
-            if (revision == null) {
-                LOG.warn("Netconf device returned revision incorrectly escaped for {}, ignoring it", capability);
-            }
-
-            // FIXME: do we really want to continue here?
-            moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
-            nonModuleCaps.remove(capability);
+            // Fallback, no revision provided for module
+            addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName));
         }
 
         return new NetconfSessionCapabilities(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps));
     }
+
+
+    private static void addModuleQName(final Set<QName> moduleBasedCaps, final Set<String> nonModuleCaps, final String capability, final QName qName) {
+        moduleBasedCaps.add(qName);
+        nonModuleCaps.remove(capability);
+    }
 }
index 87947b57faf5b28bf840411503caf6b6353133ca..80bb08f5af399fee497465e8ad1e345726784b24 100644 (file)
@@ -43,6 +43,19 @@ public class NetconfSessionCapabilitiesTest {
         assertThat(merged.getNonModuleCaps(), JUnitMatchers.hasItem("urn:ietf:params:netconf:capability:rollback-on-error:1.0"));
     }
 
+    @Test
+    public void testCapabilityNoRevision() throws Exception {
+        final List<String> caps1 = Lists.newArrayList(
+                "namespace:2?module=module2",
+                "namespace:2?module=module2&amp;revision=2012-12-12",
+                "namespace:2?module=module1&amp;RANDOMSTRING;revision=2013-12-12",
+                "namespace:2?module=module2&amp;RANDOMSTRING;revision=2013-12-12" // This one should be ignored(same as first), since revision is in wrong format
+        );
+
+        final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1);
+        assertCaps(sessionCaps1, 0, 3);
+    }
+
     private void assertCaps(final NetconfSessionCapabilities sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
         assertEquals(nonModuleCaps, sessionCaps1.getNonModuleCaps().size());
         assertEquals(moduleCaps, sessionCaps1.getModuleBasedCaps().size());
index 4a46a3c26712c8e54da6650bd4e68b2de9ad91ca..7f8f0a1d0e32478d4b977cefab1398a252c52c2a 100644 (file)
@@ -60,31 +60,31 @@ public interface RestconfService {
 
     @GET
     @Path("/modules")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getModules(@Context UriInfo uriInfo);
 
     @GET
     @Path("/modules/{identifier:.+}")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getModules(@PathParam("identifier") String identifier, @Context UriInfo uriInfo);
 
     @GET
     @Path("/modules/module/{identifier:.+}")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getModule(@PathParam("identifier") String identifier, @Context UriInfo uriInfo);
 
     @GET
     @Path("/operations")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getOperations(@Context UriInfo uriInfo);
 
     @GET
     @Path("/operations/{identifier:.+}")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getOperations(@PathParam("identifier") String identifier, @Context UriInfo uriInfo);
 
@@ -149,7 +149,7 @@ public interface RestconfService {
 
     @GET
     @Path("/streams")
-    @Produces({ Draft02.MediaTypes.API + XML, Draft02.MediaTypes.API + JSON, MediaType.APPLICATION_JSON,
+    @Produces({ Draft02.MediaTypes.API + JSON, Draft02.MediaTypes.API + XML, MediaType.APPLICATION_JSON,
             MediaType.APPLICATION_XML, MediaType.TEXT_XML })
     public StructuredData getAvailableStreams(@Context UriInfo uriInfo);
 
index 63a5b1b54055a81cd3f8f3f736365c4e99e1d8b8..10201ab6f5148c78480b9cceea34b77766f0027e 100644 (file)
@@ -78,16 +78,19 @@ public class RestconfDocumentedExceptionMapper implements ExceptionMapper<Restco
 
         LOG.debug("In toResponse: {}", exception.getMessage());
 
-        // Default to the content type if there's no Accept header
 
-        MediaType mediaType = headers.getMediaType();
 
         List<MediaType> accepts = headers.getAcceptableMediaTypes();
+        accepts.remove(MediaType.WILDCARD_TYPE);
 
         LOG.debug("Accept headers: {}", accepts);
 
+        final MediaType mediaType;
         if (accepts != null && accepts.size() > 0) {
             mediaType = accepts.get(0); // just pick the first one
+        } else {
+            // Default to the content type if there's no Accept header
+            mediaType = MediaType.APPLICATION_JSON_TYPE;
         }
 
         LOG.debug("Using MediaType: {}", mediaType);
index 272b686fc034ff33fbbe247b706ec536040660cb..3a70a399bb8f82f563eb5e9f7eaa252c9f28eeb0 100644 (file)
       <artifactId>config-util</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>sal-netconf-connector</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>netconf-api</artifactId>
index 4fe5f2a9504c60b8a448ff58feadbecb557799b3..bc8efbe91535573cc18bcd00d97e43dcf91abc2f 100644 (file)
@@ -10,27 +10,35 @@ package org.opendaylight.controller.netconf.it;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.channel.local.LocalAddress;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.auth.AuthProvider;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
@@ -42,7 +50,15 @@ import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
 import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.xml.sax.SAXException;
 
 public class NetconfITSecureTest extends AbstractNetconfConfigTest {
 
@@ -70,7 +86,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     @Test
     public void testSecure() throws Exception {
         final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
-        try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
+        try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration(new SimpleNetconfClientSessionListener()))) {
             NetconfMessage response = netconfClient.sendMessage(getGetConfig());
             assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
                     NetconfMessageUtil.isErrorMessage(response));
@@ -91,29 +107,42 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     /**
      * Test all requests are handled properly and no mismatch occurs in listener
      */
-    @Test(timeout = 3*60*1000)
+    @Test(timeout = 5*60*1000)
     public void testSecureStress() throws Exception {
+        final int requests = 10000;
+
         final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
-        try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
+        final NetconfDeviceCommunicator sessionListener = getSessionListener();
+        try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration(sessionListener))) {
 
             final AtomicInteger responseCounter = new AtomicInteger(0);
-            final List<Future<?>> futures = Lists.newArrayList();
+            final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
 
-            final int requests = 1000;
             for (int i = 0; i < requests; i++) {
-                final Future<NetconfMessage> netconfMessageFuture = netconfClient.sendRequest(getGetConfig());
+                NetconfMessage getConfig = getGetConfig();
+                getConfig = changeMessageId(getConfig, i);
+                final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture = sessionListener.sendRequest(getConfig, QName.create("namespace", "2012-12-12", "get"));
                 futures.add(netconfMessageFuture);
-                netconfMessageFuture.addListener(new GenericFutureListener<Future<? super NetconfMessage>>() {
+                Futures.addCallback(netconfMessageFuture, new FutureCallback<RpcResult<NetconfMessage>>() {
                     @Override
-                    public void operationComplete(final Future<? super NetconfMessage> future) throws Exception {
-                        assertTrue("Request unsuccessful " + future.cause(), future.isSuccess());
+                    public void onSuccess(final RpcResult<NetconfMessage> result) {
                         responseCounter.incrementAndGet();
                     }
+
+                    @Override
+                    public void onFailure(final Throwable t) {
+                        throw new RuntimeException(t);
+                    }
                 });
             }
 
-            for (final Future<?> future : futures) {
-                future.await();
+            // Wait for every future
+            for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
+                try {
+                    future.get(3, TimeUnit.MINUTES);
+                } catch (final TimeoutException e) {
+                    fail("Request " + futures.indexOf(future) + " is not responding");
+                }
             }
 
             // Give future listeners some time to finish counter incrementation
@@ -123,10 +152,17 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
         }
     }
 
-    public NetconfClientConfiguration getClientConfiguration() throws IOException {
+    private NetconfMessage changeMessageId(final NetconfMessage getConfig, final int i) throws IOException, SAXException {
+        String s = XmlUtil.toString(getConfig.getDocument(), false);
+        s = s.replace("101", Integer.toString(i));
+        return new NetconfMessage(XmlUtil.readXmlToDocument(s));
+    }
+
+    public NetconfClientConfiguration getClientConfiguration(final NetconfClientSessionListener sessionListener) throws IOException {
         final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
         b.withAddress(TLS_ADDRESS);
-        b.withSessionListener(new SimpleNetconfClientSessionListener());
+        // Using session listener from sal-netconf-connector since stress test cannot be performed with simple listener
+        b.withSessionListener(sessionListener);
         b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
         b.withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH);
         b.withConnectionTimeoutMillis(5000);
@@ -134,6 +170,16 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
         return b.build();
     }
 
+    @Mock
+    private RemoteDevice<NetconfSessionCapabilities, NetconfMessage> mockedRemoteDevice;
+
+    private NetconfDeviceCommunicator getSessionListener() {
+        MockitoAnnotations.initMocks(this);
+        doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionCapabilities.class), any(RemoteDeviceCommunicator.class));
+        doNothing().when(mockedRemoteDevice).onRemoteSessionDown();
+        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test"), mockedRemoteDevice);
+    }
+
     public AuthProvider getAuthProvider() throws Exception {
         final AuthProvider mockAuth = mock(AuthProvider.class);
         doReturn("mockedAuth").when(mockAuth).toString();
index c5037d34ed4ac01fcea4b5dd264b8f715ad2cc9c..91fb805e6ab3f00a14096faea945eb0f4bad97b0 100644 (file)
@@ -7,6 +7,7 @@
     </appender>
 
   <logger name="org.opendaylight.controller.netconf" level="TRACE"/>
+  <logger name="org.opendaylight.controller.sal.connect.netconf" level="TRACE"/>
 
   <root level="error">
     <appender-ref ref="STDOUT" />
index 369c013832790eef19dc2b751baa6a9564bb7800..3d1e4784f2ded5677472e3a25d99adaa11e24cf9 100644 (file)
@@ -328,6 +328,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                             ", remote window is not getting read or is too small"));
                 }
 
+                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+                ((ByteBuf) msg).resetReaderIndex();
                 logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
 
                 // In case of pending, re-invoke write after pending is finished
@@ -335,12 +337,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
+                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+                        // External thread could trigger write on this instance while we are on this line
+                        // Verify
                         if (future.isWritten()) {
                             synchronized (SshWriteAsyncHandler.this) {
                                 // Pending done, decrease counter
                                 pendingWriteCounter--;
+                                write(ctx, msg, promise);
                             }
-                            write(ctx, msg, promise);
                         } else {
                             // Cannot reschedule pending, fail
                             handlePendingFailed(ctx, e);
index 35ae71d0019ed2b0b1d4893c7e47901be6e906ea..987394402d7157f44a011c3d16ab77308855d8a6 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -190,8 +190,9 @@ public class ICMP extends Packet {
             end += rawPayload.length;
         }
         int checksumStartByte = start + getfieldOffset(CHECKSUM) / NetUtils.NumBitsInAByte;
+        int even = end & ~1;
 
-        for (int i = start; i <= (end - 1); i = i + 2) {
+        for (int i = start; i < even; i = i + 2) {
             // Skip, if the current bytes are checkSum bytes
             if (i == checksumStartByte) {
                 continue;
@@ -199,7 +200,13 @@ public class ICMP extends Packet {
             wordData = ((data[i] << 8) & 0xFF00) + (data[i + 1] & 0xFF);
             sum = sum + wordData;
         }
-        carry = (sum >> 16) & 0xFF;
+        if (even < end) {
+            // Add the last octet with zero padding.
+            wordData = (data[even] << 8) & 0xFF00;
+            sum = sum + wordData;
+        }
+
+        carry = sum >>> 16;
         finalSum = (sum & 0xFFFF) + carry;
         return (short) ~((short) finalSum & 0xFFFF);
     }
index 3363f423d695f1b2f090e6c5274715f47d765e3c..56793c41f6ef624efedd743b9682bb13bede8018 100644 (file)
@@ -260,7 +260,17 @@ public class IPv4 extends Packet {
      */
     public void setHeaderField(String headerField, byte[] readValue) {
         if (headerField.equals(PROTOCOL)) {
-            payloadClass = protocolClassMap.get(readValue[0]);
+            // Don't set payloadClass if framgment offset is not zero.
+            byte[] fragoff = hdrFieldsMap.get(FRAGOFFSET);
+            if (fragoff == null || BitBufferHelper.getShort(fragoff) == 0) {
+                payloadClass = protocolClassMap.get(readValue[0]);
+            }
+        } else if (headerField.equals(FRAGOFFSET)) {
+            if (readValue != null && BitBufferHelper.getShort(readValue) != 0) {
+                // Clear payloadClass because protocol header is not present
+                // in this packet.
+                payloadClass = null;
+            }
         } else if (headerField.equals(OPTIONS) &&
                    (readValue == null || readValue.length == 0)) {
             hdrFieldsMap.remove(headerField);
index e81fbf02cfafc4550ebfb5e5558d144e55d0696d..287b73ae3c22fda416fe21a8f003bc9d6e451312 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -9,6 +9,8 @@
 
 package org.opendaylight.controller.sal.packet;
 
+import java.util.Arrays;
+
 import junit.framework.Assert;
 
 import org.junit.Test;
@@ -74,28 +76,58 @@ public class ICMPTest {
                 (byte) 0x2b, (byte) 0x2c, (byte) 0x2d, (byte) 0x2e,
                 (byte) 0x2f, (byte) 0x30, (byte) 0x31, (byte) 0x32,
                 (byte) 0x33, (byte) 0x34, (byte) 0x35, (byte) 0x36, (byte) 0x37 };
+        serializeTest(icmpRawPayload, (short)0xe553);
+
+        serializeTest(null, (short)0xb108);
+        serializeTest(new byte[0], (short)0xb108);
+
+        byte[] odd = {
+            (byte)0xba, (byte)0xd4, (byte)0xc7, (byte)0x53,
+            (byte)0xf8, (byte)0x59, (byte)0x68, (byte)0x77,
+            (byte)0xfd, (byte)0x27, (byte)0xe0, (byte)0x5b,
+            (byte)0xd0, (byte)0x2e, (byte)0x28, (byte)0x41,
+            (byte)0xa3, (byte)0x48, (byte)0x5d, (byte)0x2e,
+            (byte)0x7d, (byte)0x5b, (byte)0xd3, (byte)0x60,
+            (byte)0xb3, (byte)0x88, (byte)0x8d, (byte)0x0f,
+            (byte)0x1d, (byte)0x87, (byte)0x51, (byte)0x0f,
+            (byte)0x6a, (byte)0xff, (byte)0xf7, (byte)0xd4,
+            (byte)0x40, (byte)0x35, (byte)0x4e, (byte)0x01,
+            (byte)0x36,
+        };
+        serializeTest(odd, (short)0xd0ad);
+
+        // Large payload that causes 16-bit checksum overflow more than
+        // 255 times.
+        byte[] largeEven = new byte[1024];
+        Arrays.fill(largeEven, (byte)0xff);
+        serializeTest(largeEven, (short)0xb108);
+
+        byte[] largeOdd = new byte[1021];
+        Arrays.fill(largeOdd, (byte)0xff);
+        serializeTest(largeOdd, (short)0xb207);
+    }
 
-        short checksum = (short)0xe553;
-
-        // Create ICMP object
+    private void serializeTest(byte[] payload, short checksum)
+        throws PacketException {
         ICMP icmp = new ICMP();
-        icmp.setType((byte)8);
-        icmp.setCode((byte)0);
-        icmp.setIdentifier((short) 0x46f5);
-        icmp.setSequenceNumber((short) 2);
-        icmp.setRawPayload(icmpRawPayload);
-        //icmp.setChecksum(checksum);
+        icmp.setType((byte)8).setCode((byte)0).
+            setIdentifier((short)0x46f5).setSequenceNumber((short)2);
+        int payloadSize = 0;
+        if (payload != null) {
+            icmp.setRawPayload(payload);
+            payloadSize = payload.length;
+        }
 
         // Serialize
-        byte[] stream = icmp.serialize();
-        Assert.assertTrue(stream.length == 64);
+        byte[] data = icmp.serialize();
+        Assert.assertEquals(payloadSize + 8, data.length);
 
         // Deserialize
         ICMP icmpDes = new ICMP();
-        icmpDes.deserialize(stream, 0, stream.length);
+        icmpDes.deserialize(data, 0, data.length);
 
         Assert.assertFalse(icmpDes.isCorrupted());
-        Assert.assertTrue(icmpDes.getChecksum() == checksum);
-        Assert.assertTrue(icmp.equals(icmpDes));
+        Assert.assertEquals(checksum, icmpDes.getChecksum());
+        Assert.assertEquals(icmp, icmpDes);
     }
 }
index f5298711b677a64ef9c80863580743ce2c394e52..b98342831cdf9256a01ae98698ae91babf0c2e97 100644 (file)
@@ -12,9 +12,9 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 
-import junit.framework.Assert;
-
+import org.junit.Assert;
 import org.junit.Test;
+
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.match.MatchType;
 import org.opendaylight.controller.sal.utils.EtherTypes;
@@ -481,4 +481,200 @@ public class IPv4Test {
         Assert.assertEquals(protocol, (byte) match.getField(MatchType.NW_PROTO).getValue());
         Assert.assertEquals(tos, (byte) match.getField(MatchType.NW_TOS).getValue());
     }
+
+    @Test
+    public void testFragment() throws Exception {
+        byte[] payload1 = new byte[0];
+        byte[] payload2 = {
+            (byte)0x61, (byte)0xd1, (byte)0x3d, (byte)0x51,
+            (byte)0x1b, (byte)0x75, (byte)0xa7, (byte)0x83,
+        };
+        byte[] payload3 = {
+            (byte)0xe7, (byte)0x0f, (byte)0x2d, (byte)0x7e,
+            (byte)0x15, (byte)0xba, (byte)0xe7, (byte)0x6d,
+            (byte)0xb5, (byte)0xc5, (byte)0xb5, (byte)0x37,
+            (byte)0x59, (byte)0xbc, (byte)0x91, (byte)0x43,
+            (byte)0xb5, (byte)0xb7, (byte)0xe4, (byte)0x28,
+            (byte)0xec, (byte)0x62, (byte)0x6b, (byte)0x6a,
+            (byte)0xd1, (byte)0xcb, (byte)0x79, (byte)0x1e,
+            (byte)0xfc, (byte)0x82, (byte)0xf5, (byte)0xb4,
+        };
+
+        // Ensure that the payload is not deserialized if the fragment offset
+        // is not zero.
+        byte proto = IPProtocols.TCP.byteValue();
+        fragmentTest(payload1, proto, (short)0xf250);
+        fragmentTest(payload2, proto, (short)0xf248);
+        fragmentTest(payload3, proto, (short)0xf230);
+
+        proto = IPProtocols.UDP.byteValue();
+        fragmentTest(payload1, proto, (short)0xf245);
+        fragmentTest(payload2, proto, (short)0xf23d);
+        fragmentTest(payload3, proto, (short)0xf225);
+
+        proto = IPProtocols.ICMP.byteValue();
+        fragmentTest(payload1, proto, (short)0xf255);
+        fragmentTest(payload2, proto, (short)0xf24d);
+        fragmentTest(payload3, proto, (short)0xf235);
+
+        // Ensure that the protocol header in the first fragment is
+        // deserialized.
+        proto = IPProtocols.TCP.byteValue();
+        TCP tcp = new TCP();
+        tcp.setSourcePort((short)1234).setDestinationPort((short)32000).
+            setSequenceNumber((int)0xd541f5f8).setAckNumber((int)0x58da787d).
+            setDataOffset((byte)5).setReserved((byte)0).
+            setHeaderLenFlags((short)0x18).setWindowSize((short)0x40e8).
+            setUrgentPointer((short)0x15f7).setChecksum((short)0x0d4e);
+        firstFragmentTest(tcp, payload1, proto, (short)0xdfe6);
+        tcp.setChecksum((short)0xab2a);
+        firstFragmentTest(tcp, payload2, proto, (short)0xdfde);
+        tcp.setChecksum((short)0x1c75);
+        firstFragmentTest(tcp, payload3, proto, (short)0xdfc6);
+
+        proto = IPProtocols.UDP.byteValue();
+        UDP udp = new UDP();
+        udp.setSourcePort((short)53).setDestinationPort((short)45383).
+            setLength((short)(payload1.length + 8)).setChecksum((short)0);
+        firstFragmentTest(udp, payload1, proto, (short)0xdfe7);
+        udp.setLength((short)(payload2.length + 8));
+        firstFragmentTest(udp, payload2, proto, (short)0xdfdf);
+        udp.setLength((short)(payload3.length + 8));
+        firstFragmentTest(udp, payload3, proto, (short)0xdfc7);
+
+        proto = IPProtocols.ICMP.byteValue();
+        ICMP icmp = new ICMP();
+        icmp.setType((byte)8).setCode((byte)0).setIdentifier((short)0x3d1e).
+            setSequenceNumber((short)1);
+        firstFragmentTest(icmp, payload1, proto, (short)0xdff7);
+        firstFragmentTest(icmp, payload2, proto, (short)0xdfef);
+        firstFragmentTest(icmp, payload3, proto, (short)0xdfd7);
+    }
+
+    private void fragmentTest(byte[] payload, byte proto, short checksum)
+        throws Exception {
+        // Construct a fragmented raw IPv4 packet.
+        int ipv4Len = 20;
+        byte[] rawIp = new byte[ipv4Len + payload.length];
+
+        byte ipVersion = 4;
+        byte dscp = 35;
+        byte ecn = 2;
+        byte tos = (byte)((dscp << 2) | ecn);
+        short totalLen = (short)rawIp.length;
+        short id = 22143;
+        short offset = 0xb9;
+        byte ttl = 64;
+        byte[] srcIp = {(byte)0x0a, (byte)0x00, (byte)0x00, (byte)0x01};
+        byte[] dstIp = {(byte)0xc0, (byte)0xa9, (byte)0x66, (byte)0x23};
+
+        rawIp[0] = (byte)((ipVersion << 4) | (ipv4Len >> 2));
+        rawIp[1] = tos;
+        rawIp[2] = (byte)(totalLen >>> Byte.SIZE);
+        rawIp[3] = (byte)totalLen;
+        rawIp[4] = (byte)(id >>> Byte.SIZE);
+        rawIp[5] = (byte)id;
+        rawIp[6] = (byte)(offset >>> Byte.SIZE);
+        rawIp[7] = (byte)offset;
+        rawIp[8] = ttl;
+        rawIp[9] = proto;
+        rawIp[10] = (byte)(checksum >>> Byte.SIZE);
+        rawIp[11] = (byte)checksum;
+        System.arraycopy(srcIp, 0, rawIp, 12, srcIp.length);
+        System.arraycopy(dstIp, 0, rawIp, 16, srcIp.length);
+        System.arraycopy(payload, 0, rawIp, ipv4Len, payload.length);
+
+        // Deserialize.
+        IPv4 ipv4 = new IPv4();
+        ipv4.deserialize(rawIp, 0, rawIp.length * Byte.SIZE);
+
+        Assert.assertEquals(ipVersion, ipv4.getVersion());
+        Assert.assertEquals(ipv4Len, ipv4.getHeaderLen());
+        Assert.assertEquals(dscp, ipv4.getDiffServ());
+        Assert.assertEquals(ecn, ipv4.getECN());
+        Assert.assertEquals(totalLen, ipv4.getTotalLength());
+        Assert.assertEquals(id, ipv4.getIdentification());
+        Assert.assertEquals((byte)0, ipv4.getFlags());
+        Assert.assertEquals(offset, ipv4.getFragmentOffset());
+        Assert.assertEquals(ttl, ipv4.getTtl());
+        Assert.assertEquals(proto, ipv4.getProtocol());
+        Assert.assertEquals(checksum, ipv4.getChecksum());
+        Assert.assertEquals(NetUtils.byteArray4ToInt(srcIp),
+                            ipv4.getSourceAddress());
+        Assert.assertEquals(NetUtils.byteArray4ToInt(dstIp),
+                            ipv4.getDestinationAddress());
+        Assert.assertFalse(ipv4.isCorrupted());
+
+        // payloadClass should not be set if fragment offset is not zero.
+        Assert.assertEquals(null, ipv4.getPayload());
+        Assert.assertArrayEquals(payload, ipv4.getRawPayload());
+    }
+
+    private void firstFragmentTest(Packet payload, byte[] rawPayload,
+                                   byte proto, short checksum)
+        throws Exception {
+        // Construct a raw IPv4 packet with MF flag.
+        int ipv4Len = 20;
+        payload.setRawPayload(rawPayload);
+        byte[] payloadBytes = payload.serialize();
+        byte[] rawIp = new byte[ipv4Len + payloadBytes.length];
+
+        byte ipVersion = 4;
+        byte dscp = 13;
+        byte ecn = 1;
+        byte tos = (byte)((dscp << 2) | ecn);
+        short totalLen = (short)rawIp.length;
+        short id = 19834;
+        byte flags = 0x1;
+        short offset = 0;
+        short off = (short)(((short)flags << 13) | offset);
+        byte ttl = 64;
+        byte[] srcIp = {(byte)0xac, (byte)0x23, (byte)0x5b, (byte)0xfd};
+        byte[] dstIp = {(byte)0xc0, (byte)0xa8, (byte)0x64, (byte)0x71};
+
+        rawIp[0] = (byte)((ipVersion << 4) | (ipv4Len >> 2));
+        rawIp[1] = tos;
+        rawIp[2] = (byte)(totalLen >>> Byte.SIZE);
+        rawIp[3] = (byte)totalLen;
+        rawIp[4] = (byte)(id >>> Byte.SIZE);
+        rawIp[5] = (byte)id;
+        rawIp[6] = (byte)(off >>> Byte.SIZE);
+        rawIp[7] = (byte)off;
+        rawIp[8] = ttl;
+        rawIp[9] = proto;
+        rawIp[10] = (byte)(checksum >>> Byte.SIZE);
+        rawIp[11] = (byte)checksum;
+        System.arraycopy(srcIp, 0, rawIp, 12, srcIp.length);
+        System.arraycopy(dstIp, 0, rawIp, 16, srcIp.length);
+        System.arraycopy(payloadBytes, 0, rawIp, ipv4Len, payloadBytes.length);
+
+        // Deserialize.
+        IPv4 ipv4 = new IPv4();
+        ipv4.deserialize(rawIp, 0, rawIp.length * Byte.SIZE);
+
+        Assert.assertEquals(ipVersion, ipv4.getVersion());
+        Assert.assertEquals(ipv4Len, ipv4.getHeaderLen());
+        Assert.assertEquals(dscp, ipv4.getDiffServ());
+        Assert.assertEquals(ecn, ipv4.getECN());
+        Assert.assertEquals(totalLen, ipv4.getTotalLength());
+        Assert.assertEquals(id, ipv4.getIdentification());
+        Assert.assertEquals(flags, ipv4.getFlags());
+        Assert.assertEquals(offset, ipv4.getFragmentOffset());
+        Assert.assertEquals(ttl, ipv4.getTtl());
+        Assert.assertEquals(proto, ipv4.getProtocol());
+        Assert.assertEquals(checksum, ipv4.getChecksum());
+        Assert.assertEquals(NetUtils.byteArray4ToInt(srcIp),
+                            ipv4.getSourceAddress());
+        Assert.assertEquals(NetUtils.byteArray4ToInt(dstIp),
+                            ipv4.getDestinationAddress());
+        Assert.assertFalse(ipv4.isCorrupted());
+
+        // Protocol header in the first fragment should be deserialized.
+        Assert.assertEquals(null, ipv4.getRawPayload());
+
+        Packet desPayload = ipv4.getPayload();
+        Assert.assertEquals(payload, desPayload);
+        Assert.assertFalse(desPayload.isCorrupted());
+        Assert.assertArrayEquals(rawPayload, desPayload.getRawPayload());
+    }
 }