Merge "Bug 781: Statistics manager is throwing IllegalStateException"
authorEd Warnicke <eaw@cisco.com>
Wed, 23 Apr 2014 11:58:13 +0000 (11:58 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 23 Apr 2014 11:58:13 +0000 (11:58 +0000)
19 files changed:
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/MDFlowMapping.xtend
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/ToSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/TestFromSalConversionsUtils.java
opendaylight/md-sal/compatibility/sal-compatibility/src/test/java/org/opendaylight/controller/sal/compatibility/test/TestToSalConversionsUtils.java
opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-flow-types.yang
opendaylight/md-sal/model/model-flow-service/src/main/yang/node-errors.yang [new file with mode: 0644]
opendaylight/md-sal/model/model-flow-service/src/main/yang/packet-processing.yang
opendaylight/md-sal/model/model-flow-service/src/main/yang/sal-flow.yang
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NoficationTest.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java
opendaylight/md-sal/sal-netconf-connector/pom.xml
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend [deleted file]
opendaylight/md-sal/samples/l2switch/implementation/src/main/java/org/opendaylight/controller/sample/l2switch/md/flow/FlowWriterServiceImpl.java

index 6a9712b1c6a41a093bd905996606290b4e33289e..75cbf49ee50fa2b529714ae4807f6426ac4f029d 100644 (file)
@@ -113,6 +113,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instru
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie
 
 public class MDFlowMapping {
 
@@ -127,7 +128,7 @@ public class MDFlowMapping {
 
         hardTimeout = sourceFlow.hardTimeout as int
         idleTimeout = sourceFlow.idleTimeout as int
-        cookie = BigInteger.valueOf(sourceFlow.id)
+        cookie = new FlowCookie(BigInteger.valueOf(sourceFlow.id))
         priority = sourceFlow.priority as int
 
         val sourceActions = sourceFlow.actions;
@@ -150,7 +151,7 @@ public class MDFlowMapping {
        val it = new FlowBuilder();
        hardTimeout = sourceFlow.hardTimeout as int
        idleTimeout = sourceFlow.idleTimeout as int
-       cookie = BigInteger.valueOf(sourceFlow.id)
+       cookie = new FlowCookie(BigInteger.valueOf(sourceFlow.id))
        priority = sourceFlow.priority as int
        id = new FlowId(flowId)
     
@@ -404,7 +405,7 @@ public class MDFlowMapping {
 
         hardTimeout = sourceFlow.hardTimeout as int
         idleTimeout = sourceFlow.idleTimeout as int
-        cookie = BigInteger.valueOf(sourceFlow.id)
+        cookie = new FlowCookie(BigInteger.valueOf(sourceFlow.id))
         priority = sourceFlow.priority as int
 
         val sourceActions = sourceFlow.actions;
index cb7289685831b75d275335c1aa6c9b2c44bc4532..74b94c7cba10771fee33f10eeddb6e6a0c2e2b16 100644 (file)
@@ -148,7 +148,7 @@ public class ToSalConversionsUtils {
             target.setActions(actionFrom(actions, node));
         }
 
-        target.setId(source.getCookie().longValue());
+        target.setId(source.getCookie().getValue().longValue());
         return target;
     }
 
index b062b0016d54340040dc6a827b5cf6eb745f4076..63c5664a0c4b2de63e5b6e90279eb59f37e050ef 100644 (file)
@@ -177,7 +177,7 @@ public class TestFromSalConversionsUtils {
     }
 
     private void checkOdFlow(NodeFlow odNodeFlow) {
-        assertEquals("Cookie is incorrect.", 9223372036854775807L, odNodeFlow.getCookie().longValue());
+        assertEquals("Cookie is incorrect.", 9223372036854775807L, odNodeFlow.getCookie().getValue().longValue());
         assertEquals("Hard timeout is incorrect.", 32765, odNodeFlow.getHardTimeout().shortValue());
         assertEquals("Iddle timeout is incorrect.", 32766, odNodeFlow.getIdleTimeout().shortValue());
         assertEquals("Priority is incorrect.", 32767, odNodeFlow.getPriority().shortValue());
index ca16c65f5613ac7682a304aa8e171a6701f5675d..71f2e9480543feae77bb92275595c29d2119f1c6 100644 (file)
@@ -105,6 +105,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.addr
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.address.address.Ipv6Builder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.InstructionsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
@@ -347,7 +348,7 @@ public class TestToSalConversionsUtils {
     private FlowAddedBuilder prepareOdFlowCommon() {
         FlowAddedBuilder odNodeFlowBuilder = new FlowAddedBuilder();
 
-        odNodeFlowBuilder.setCookie(new BigInteger("9223372036854775807"));
+        odNodeFlowBuilder.setCookie(new FlowCookie(new BigInteger("9223372036854775807")));
         odNodeFlowBuilder.setHardTimeout(32767);
         odNodeFlowBuilder.setIdleTimeout(32767);
         odNodeFlowBuilder.setPriority(32767);
index 02bdd1a1fa8a8510ebad0b27be5a2aa29e1bdd9e..0d8a0b6f549c1d9583d54386d045cb9bd4b83468 100644 (file)
@@ -15,6 +15,11 @@ module opendaylight-flow-types {
         type instance-identifier;
     }
     
+    typedef flow-cookie {
+        description "openflow specific type - flow cookie / flow cookie mask";
+        type uint64;
+    }
+    
     typedef output-port-values {
         type enumeration {
             enum MAX {
@@ -143,7 +148,7 @@ module opendaylight-flow-types {
         }
         
         leaf cookie {
-            type uint64;
+            type flow-cookie;
         }
         
         leaf table_id {
@@ -167,7 +172,7 @@ module opendaylight-flow-types {
         }
         
         leaf cookie_mask {
-            type uint64;
+            type flow-cookie;
         }
         
         leaf buffer_id {
diff --git a/opendaylight/md-sal/model/model-flow-service/src/main/yang/node-errors.yang b/opendaylight/md-sal/model/model-flow-service/src/main/yang/node-errors.yang
new file mode 100644 (file)
index 0000000..a8eec26
--- /dev/null
@@ -0,0 +1,168 @@
+module node-error {
+    namespace "urn:opendaylight:node:error:service";
+    prefix node-error;
+
+    import sal-flow {prefix flow; revision-date "2013-08-19";}
+    import flow-errors {prefix error;}
+    import flow-capable-transaction {prefix tr;}
+    import yang-ext {prefix ext; revision-date "2013-07-09";}
+    import opendaylight-inventory {prefix inv;revision-date "2013-08-19";}
+    import opendaylight-flow-types {prefix types;revision-date "2013-10-26";}
+    import opendaylight-group-types {prefix group-type;revision-date 2013-10-18;}
+    import opendaylight-meter-types {prefix meter-type;revision-date "2013-09-18";}
+
+    revision "2014-04-10" {
+        description "Initial revision of errors received from a node";
+    }
+
+    notification hello-failed-error-notification {
+        description "Model for ofp_error-Type=0, Hello protocol failed";
+
+        uses error:error-message;
+        uses tr:transaction-aware;
+        uses tr:transaction-metadata;
+        uses flow:base-node-error-notification;
+        uses flow:node-error-reference;
+    }
+
+    notification bad-request-error-notification {
+        description "Model for ofp_error-Type=1, Request was not understood.";
+
+        uses error:error-message;
+        uses tr:transaction-aware;
+        uses tr:transaction-metadata;
+        uses flow:base-node-error-notification;
+        uses flow:node-error-reference;
+    }
+
+    notification bad-action-error-notification {
+        description "Model for ofp_error-Type=2, Error in action description.";
+
+        uses error:error-message;
+        uses tr:transaction-aware;
+        uses tr:transaction-metadata;
+        uses flow:base-node-error-notification;
+        uses flow:node-error-reference;
+    }
+
+    notification bad-instruction-error-notification {
+            description "Model for ofp_error-Type=3, Error in instruction list.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification bad-match-error-notification {
+            description "Model for ofp_error-Type=4, Error in match.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification flow-mod-error-notification {
+            description "Model for ofp_error-Type=5 - Problem modifying flow entry.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+            uses flow:node-error-reference;
+        }
+
+    notification group-mod-error-notification {
+            description "Model for ofp_error-Type=6 - Problem modifying group entry.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification port-mod-error-notification {
+            description "Model for ofp_error-Type=7 - Port mod request failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification table-mod-error-notification {
+            description "Model for ofp_error-Type=8 - Table mod request failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification queue-op-error-notification {
+            description "Model for ofp_error-Type=9 - Queue operation failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification switch-config-error-notification {
+            description "Model for ofp_error-Type=10 - Switch Config request failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification role-request-error-notification {
+            description "Model for ofp_error-Type=11 - Controller Role request failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification meter-mod-error-notification {
+            description "Model for ofp_error-Type=12 - Error in meter.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification table-features-error-notification {
+            description "Model for ofp_error-Type=13 - Setting table features failed.";
+
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+
+    notification experimenter-error-notification {
+            description "Model for ofp_error-Type=65535  - Experimenter Error Messages";
+            uses error:error-message;
+            uses tr:transaction-aware;
+            uses tr:transaction-metadata;
+            uses flow:base-node-error-notification;
+            uses flow:node-error-reference;
+        }
+}
+
index f3db3181f7935665a8731d5bb76119aac5036df4..50368e46a6a3f100725ac2fa80f3bca4eb9dcec2 100644 (file)
@@ -9,13 +9,14 @@ module packet-processing {
     import opendaylight-match-types {prefix match-type;revision-date "2013-10-26";}
     import opendaylight-table-types {prefix table-type;revision-date "2013-10-26";}
     import opendaylight-action-types {prefix action-type;revision-date "2013-11-12";}
+    import opendaylight-flow-types {prefix flow-type;revision-date "2013-10-26";}
 
     
     revision "2013-07-09" {
         description "";
     }
 
-    typedef cookie {
+    typedef connection-cookie {
         type uint32;
     }
 
@@ -58,8 +59,12 @@ module packet-processing {
     }
 
     notification packet-received {
-       leaf cookie {
-                type cookie;
+       leaf connection-cookie {
+                type connection-cookie;
+        }
+        
+        leaf flow-cookie {
+            type flow-type:flow-cookie;
         }
         
         leaf table-id {
@@ -83,14 +88,14 @@ module packet-processing {
        input {
             uses inv:node-context-ref;
 
-            leaf cookie {
-                type cookie;
+            leaf connection-cookie {
+                type connection-cookie;
             }
             
                leaf egress {
                        type inv:node-connector-ref;
                }
-               leaf buffer-id {
+            leaf buffer-id {
                   type uint32;
             }
 
index b3e6e450af8c3f9d4b0d593b91547e0f48023620..4cb1d08917a5040e06397d334df383cb99cac35e 100644 (file)
@@ -38,6 +38,33 @@ module sal-flow {
         uses types:flow;
     }
 
+    grouping base-node-error-notification {
+        leaf node {
+            ext:context-reference "inv:node-context";
+                type inv:node-ref;
+        }
+    }
+
+    grouping node-error-reference {
+        choice object-reference {
+            case flow-ref{
+                leaf flow-ref {
+                        type types:flow-ref;
+                }
+            }
+            case group-ref{
+                leaf group-ref {
+                    type group-type:group-ref;
+                }
+            }
+            case meter-ref{
+                leaf meter-ref {
+                    type meter-type:meter-ref;
+                }
+            }
+        }
+    }
+
     /** Base configuration structure **/
     grouping flow-update {
         uses "inv:node-context-ref";
@@ -124,28 +151,8 @@ module sal-flow {
         uses error:error-message;
         uses tr:transaction-aware;
         uses tr:transaction-metadata;
-        choice object-reference {
-        case flow-ref{
-                   leaf flow-ref {
-            type types:flow-ref;
-         }
-       }
-        case group-ref{
-                   leaf group-ref {
-            type group-type:group-ref;
-         }
-       }
-        case meter-ref{
-                   leaf meter-ref {
-            type meter-type:meter-ref;
-         }
-       }
-     }
-        leaf node {
-            ext:context-reference "inv:node-context";
-            type inv:node-ref;
-        }
-        
+        uses node-error-reference;
+        uses base-node-error-notification;
     }
     
     notification node-experimenter-error-notification {
index 4deef69198216c5d0f6fe1578caccee90dbc1458..ac26445bf408677b6e397c9fbb77bcf4fcaa19de 100644 (file)
@@ -7,22 +7,22 @@
  */
 package org.opendaylight.controller.sal.binding.codegen.impl;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import javassist.ClassPool;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javassist.ClassPool;
-
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 public class SingletonHolder {
 
     public static final ClassPool CLASS_POOL = ClassPool.getDefault();
@@ -45,12 +45,43 @@ public class SingletonHolder {
      */
     @Deprecated
     public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
+
         if (NOTIFICATION_EXECUTOR == null) {
+            // Overriding the queue since we need an unbounded queue
+            // and threadpoolexecutor would not create new threads if the queue is not full
+            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
+                @Override
+                public boolean offer(Runnable r) {
+                    if (size() <= 1) {
+                        // if the queue is empty (or has just 1), no need to rampup the threads
+                        return super.offer(r);
+                    } else {
+                        // if the queue is not empty, force the queue to return false.
+                        // threadpoolexecutor will spawn a new thread if the queue.offer returns false.
+                        return false;
+                    }
+                }
+            };
+
             ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-notification-%d").build();
-            ExecutorService executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
-                    NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+
+            ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
+                    NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, queue , factory,
+                    new RejectedExecutionHandler() {
+                        // if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    });
+
             NOTIFICATION_EXECUTOR = MoreExecutors.listeningDecorator(executor);
         }
+
         return NOTIFICATION_EXECUTOR;
     }
 
index b23ceaaf15b69c0bed04977b97e3bcefb7eab50a..9519a2a732491f9fc51bb16f26f8bd3bc6958989 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Node
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.opendaylight.yangtools.yang.binding.RpcService;
@@ -99,7 +100,7 @@ public class NoficationTest extends AbstractTest {
          *
          */
         assertEquals(1, listener1.addedFlows.size());
-        assertEquals(0, listener1.addedFlows.get(0).getCookie().intValue());
+        assertEquals(0, listener1.addedFlows.get(0).getCookie().getValue().intValue());
 
         /**
          * The registration of the Consumer 2. SalFlowListener is registered
@@ -196,7 +197,7 @@ public class NoficationTest extends AbstractTest {
      */
     public static FlowAdded flowAdded(int i) {
         FlowAddedBuilder ret = new FlowAddedBuilder();
-        ret.setCookie(BigInteger.valueOf(i));
+        ret.setCookie(new FlowCookie(BigInteger.valueOf(i)));
         return ret.build();
     }
 
index 9c24ca08303b4f524198808b174ce0af67469bc6..d49d6f0e25e271e43c8550feb5eef63d96301184 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@@ -209,7 +210,7 @@ public class RoutedServiceTest extends AbstractTest {
     static AddFlowInput createSampleAddFlow(NodeRef node, int cookie) {
         AddFlowInputBuilder ret = new AddFlowInputBuilder();
         ret.setNode(node);
-        ret.setCookie(BigInteger.valueOf(cookie));
+        ret.setCookie(new FlowCookie(BigInteger.valueOf(cookie)));
         return ret.build();
     }
 }
index 9b701203ce44ee29c86ddd83a9b72a6a3ed3218b..27d320f03e656587bfabfc4b6ad95d369e55cd99 100644 (file)
       <groupId>${project.groupId}</groupId>
       <artifactId>sal-connector-api</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.eclipse.xtend</groupId>
-      <artifactId>org.eclipse.xtend.lib</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring</artifactId>
         </configuration>
       </plugin>
 
-      <plugin>
-        <groupId>org.eclipse.xtend</groupId>
-        <artifactId>xtend-maven-plugin</artifactId>
-      </plugin>
-
       <plugin>
         <groupId>org.opendaylight.yangtools</groupId>
         <artifactId>yang-maven-plugin</artifactId>
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
new file mode 100644 (file)
index 0000000..8d52950
--- /dev/null
@@ -0,0 +1,512 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
+
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.concurrent.EventExecutor;
+
+public class NetconfDevice implements Provider, //
+        DataReader<InstanceIdentifier, CompositeNode>, //
+        DataCommitHandler<InstanceIdentifier, CompositeNode>, //
+        RpcImplementation, //
+        AutoCloseable {
+
+    InetSocketAddress socketAddress;
+
+    MountProvisionInstance mountInstance;
+
+    EventExecutor eventExecutor;
+
+    ExecutorService processingExecutor;
+
+    InstanceIdentifier path;
+
+    ReconnectStrategy reconnectStrategy;
+
+    AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
+
+    private NetconfDeviceSchemaContextProvider deviceContextProvider;
+
+    protected Logger logger;
+
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
+    Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
+    List<RpcRegistration> rpcReg;
+
+    String name;
+
+    MountProvisionService mountService;
+
+    NetconfClientDispatcher dispatcher;
+
+    static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
+
+    SchemaSourceProvider<InputStream> remoteSourceProvider;
+
+    DataBrokerService dataBroker;
+
+    NetconfDeviceListener listener;
+
+    private boolean rollbackSupported;
+
+
+    public NetconfDevice(String name) {
+        this.name = name;
+        this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
+        this.path = InstanceIdentifier.builder(INVENTORY_PATH)
+                .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
+    }
+
+    public void start() {
+        checkState(dispatcher != null, "Dispatcher must be set.");
+        checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
+        checkState(eventExecutor != null, "Event executor must be set.");
+
+        listener = new NetconfDeviceListener(this);
+
+        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
+
+        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
+    }
+
+    Optional<SchemaContext> getSchemaContext() {
+        if (deviceContextProvider == null) {
+            return Optional.absent();
+        }
+        return deviceContextProvider.currentContext;
+    }
+
+    void bringDown() {
+        if (rpcReg != null) {
+            for (RpcRegistration reg : rpcReg) {
+                reg.close();
+            }
+            rpcReg = null;
+        }
+        closeGracefully(confReaderReg);
+        confReaderReg = null;
+        closeGracefully(operReaderReg);
+        operReaderReg = null;
+        closeGracefully(commitHandlerReg);
+        commitHandlerReg = null;
+
+        updateDeviceState(false, Collections.<QName> emptySet());
+    }
+
+    private void closeGracefully(final AutoCloseable resource) {
+        if (resource != null) {
+            try {
+                resource.close();
+            } catch (Exception e) {
+                logger.warn("Ignoring exception while closing {}", resource, e);
+            }
+        }
+    }
+
+    void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
+        // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
+        // Reason: delegate.getSchema blocks thread when waiting for response
+        // however, if the netty thread is blocked, no incoming message can be processed
+        // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
+        // TODO redesign +refactor
+        processingExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                NetconfDevice.this.rollbackSupported = rollbackSupported;
+                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+                deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
+                deviceContextProvider.createContextFromCapabilities(capabilities);
+                if (mountInstance != null && getSchemaContext().isPresent()) {
+                    mountInstance.setSchemaContext(getSchemaContext().get());
+                }
+
+                updateDeviceState(true, capabilities);
+
+                if (mountInstance != null) {
+                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
+                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
+                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
+
+                    List<RpcRegistration> rpcs = new ArrayList<>();
+                    // TODO same condition twice
+                    if (mountInstance != null && getSchemaContext().isPresent()) {
+                        for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
+                            rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
+                        }
+                    }
+                    rpcReg = rpcs;
+                }
+            }
+        });
+    }
+
+    private void updateDeviceState(boolean up, Set<QName> capabilities) {
+        DataModificationTransaction transaction = dataBroker.beginTransaction();
+
+        CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
+        it.setQName(INVENTORY_NODE);
+        it.addLeaf(INVENTORY_ID, name);
+        it.addLeaf(INVENTORY_CONNECTED, up);
+
+        logger.debug("Client capabilities {}", capabilities);
+        for (QName capability : capabilities) {
+            it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
+        }
+
+        logger.debug("Update device state transaction " + transaction.getIdentifier()
+                + " putting operational data started.");
+        transaction.removeOperationalData(path);
+        transaction.putOperationalData(path, it.toInstance());
+        logger.debug("Update device state transaction " + transaction.getIdentifier()
+                + " putting operational data ended.");
+
+        // FIXME: this has to be asynchronous
+        RpcResult<TransactionStatus> transactionStatus = null;
+        try {
+            transactionStatus = transaction.commit().get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for response", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        }
+        // TODO better ex handling
+
+        if (transactionStatus.isSuccessful()) {
+            logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
+        } else {
+            logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
+            logger.debug("Update device state transaction status " + transaction.getStatus());
+        }
+    }
+
+    @Override
+    public CompositeNode readConfigurationData(InstanceIdentifier path) {
+        RpcResult<CompositeNode> result = null;
+        try {
+            result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
+                    wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for response", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        }
+
+        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+        return data == null ? null : (CompositeNode) findNode(data, path);
+    }
+
+    @Override
+    public CompositeNode readOperationalData(InstanceIdentifier path) {
+        RpcResult<CompositeNode> result = null;
+        try {
+            result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for response", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        }
+
+        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+        return (CompositeNode) findNode(data, path);
+    }
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+        return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
+    }
+
+    @Override
+    public Collection<ProviderFunctionality> getProviderFunctionality() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public void onSessionInitiated(ProviderSession session) {
+        dataBroker = session.getService(DataBrokerService.class);
+
+        DataModificationTransaction transaction = dataBroker.beginTransaction();
+        if (operationalNodeNotExisting(transaction)) {
+            transaction.putOperationalData(path, getNodeWithId());
+        }
+        if (configurationNodeNotExisting(transaction)) {
+            transaction.putConfigurationData(path, getNodeWithId());
+        }
+
+        try {
+            transaction.commit().get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for response", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        }
+
+        mountService = session.getService(MountProvisionService.class);
+        if (mountService != null) {
+            mountInstance = mountService.createOrGetMountPoint(path);
+        }
+    }
+
+    CompositeNode getNodeWithId() {
+        SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
+        return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
+    }
+
+    boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
+        return null == transaction.readConfigurationData(path);
+    }
+
+    boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
+        return null == transaction.readOperationalData(path);
+    }
+
+    static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
+
+        Node<?> current = node;
+        for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
+            if (current instanceof SimpleNode<?>) {
+                return null;
+            } else if (current instanceof CompositeNode) {
+                CompositeNode currentComposite = (CompositeNode) current;
+
+                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
+                if (current == null) {
+                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    return null;
+                }
+            }
+        }
+        return current;
+    }
+
+    @Override
+    public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
+            DataModification<InstanceIdentifier, CompositeNode> modification) {
+        NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
+                modification, true, rollbackSupported);
+        try {
+            twoPhaseCommit.prepare();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for response", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        }
+         return twoPhaseCommit;
+    }
+
+    Set<QName> getCapabilities(Collection<String> capabilities) {
+        return FluentIterable.from(capabilities).filter(new Predicate<String>() {
+            @Override
+            public boolean apply(final String capability) {
+                return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
+            }
+        }).transform(new Function<String, QName>() {
+            @Override
+            public QName apply(final String capability) {
+                String[] parts = capability.split("\\?");
+                String namespace = parts[0];
+                FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
+
+                String revision = getStringAndTransform(queryParams, "revision=", "revision=");
+
+                String moduleName = getStringAndTransform(queryParams, "module=", "module=");
+
+                if (revision == null) {
+                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+                    revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
+
+                    if (revision != null) {
+                        logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
+                    }
+                }
+                if (revision == null) {
+                    return QName.create(URI.create(namespace), null, moduleName);
+                }
+                return QName.create(namespace, revision, moduleName);
+            }
+
+            private String getStringAndTransform(final Iterable<String> queryParams, final String match,
+                    final String substringToRemove) {
+                Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
+                    @Override
+                    public boolean apply(final String input) {
+                        return input.startsWith(match);
+                    }
+                });
+
+                return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+            }
+
+        }).toSet();
+    }
+
+    @Override
+    public void close() {
+        bringDown();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public InetSocketAddress getSocketAddress() {
+        return socketAddress;
+    }
+
+    public MountProvisionInstance getMountInstance() {
+        return mountInstance;
+    }
+
+    public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
+        this.reconnectStrategy = reconnectStrategy;
+    }
+
+    public void setProcessingExecutor(final ExecutorService processingExecutor) {
+        this.processingExecutor = processingExecutor;
+    }
+
+    public void setSocketAddress(final InetSocketAddress socketAddress) {
+        this.socketAddress = socketAddress;
+    }
+
+    public void setEventExecutor(final EventExecutor eventExecutor) {
+        this.eventExecutor = eventExecutor;
+    }
+
+    public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
+        this.schemaSourceProvider = schemaSourceProvider;
+    }
+
+    public void setDispatcher(final NetconfClientDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+}
+
+class NetconfDeviceSchemaContextProvider {
+
+    NetconfDevice device;
+
+    SchemaSourceProvider<InputStream> sourceProvider;
+
+    Optional<SchemaContext> currentContext;
+
+    NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
+        this.device = device;
+        this.sourceProvider = sourceProvider;
+        this.currentContext = Optional.absent();
+    }
+
+    void createContextFromCapabilities(Iterable<QName> capabilities) {
+        YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
+        if (!sourceContext.getMissingSources().isEmpty()) {
+            device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
+        }
+        device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
+        List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
+        if (!sourceContext.getValidSources().isEmpty()) {
+            SchemaContext schemaContext = tryToCreateContext(modelsToParse);
+            currentContext = Optional.fromNullable(schemaContext);
+        } else {
+            currentContext = Optional.absent();
+        }
+        if (currentContext.isPresent()) {
+            device.logger.debug("Schema context successfully created.");
+        }
+    }
+
+    SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
+        YangParserImpl parser = new YangParserImpl();
+        try {
+
+            Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
+            return parser.resolveSchemaContext(models);
+        } catch (Exception e) {
+            device.logger.debug("Error occured during parsing YANG schemas", e);
+            return null;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend
deleted file mode 100644 (file)
index 0b88c66..0000000
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf
-
-import com.google.common.base.Optional
-import com.google.common.collect.FluentIterable
-import io.netty.util.concurrent.EventExecutor
-import java.io.InputStream
-import java.net.InetSocketAddress
-import java.net.URI
-import java.util.ArrayList
-import java.util.Collection
-import java.util.Collections
-import java.util.List
-import java.util.Set
-import java.util.concurrent.ExecutorService
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
-import org.opendaylight.controller.sal.core.api.Provider
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
-import org.opendaylight.protocol.framework.ReconnectStrategy
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.yangtools.yang.common.QName
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.yangtools.yang.data.api.Node
-import org.opendaylight.yangtools.yang.data.api.SimpleNode
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.yangtools.yang.model.api.SchemaContext
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
-import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-import static com.google.common.base.Preconditions.*
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
-
-import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
-
-class NetconfDevice implements Provider, //
-DataReader<InstanceIdentifier, CompositeNode>, //
-DataCommitHandler<InstanceIdentifier, CompositeNode>, //
-RpcImplementation, //
-AutoCloseable {
-
-    @Property
-    var InetSocketAddress socketAddress;
-
-    @Property
-    var MountProvisionInstance mountInstance;
-
-    @Property
-    var EventExecutor eventExecutor;
-
-    @Property
-    var ExecutorService processingExecutor;
-
-    @Property
-    var InstanceIdentifier path;
-
-    @Property
-    var ReconnectStrategy reconnectStrategy;
-
-    @Property
-    var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
-    @Property
-    private NetconfDeviceSchemaContextProvider deviceContextProvider
-
-    protected val Logger logger
-
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
-    Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
-    List<RpcRegistration> rpcReg
-
-    @Property
-    val String name
-
-    MountProvisionService mountService
-
-    @Property
-    var NetconfClientDispatcher dispatcher
-
-    static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
-
-    @Property
-    var SchemaSourceProvider<InputStream> remoteSourceProvider
-
-    DataBrokerService dataBroker
-
-    var NetconfDeviceListener listener;
-
-    public new(String name) {
-        this._name = name;
-        this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
-        this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
-            Collections.singletonMap(INVENTORY_ID, name)).toInstance;
-    }
-
-    def start() {
-        checkState(dispatcher != null, "Dispatcher must be set.");
-        checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
-        checkState(eventExecutor != null, "Event executor must be set.");
-
-        listener = new NetconfDeviceListener(this);
-
-        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
-
-        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
-    }
-
-    def Optional<SchemaContext> getSchemaContext() {
-        if (deviceContextProvider == null) {
-            return Optional.absent();
-        }
-        return deviceContextProvider.currentContext;
-    }
-
-    def bringDown() {
-        if (rpcReg != null) {
-            for (reg : rpcReg) {
-                reg.close()
-            }
-            rpcReg = null
-        }
-        confReaderReg?.close()
-        confReaderReg = null
-        operReaderReg?.close()
-        operReaderReg = null
-        commitHandlerReg?.close()
-        commitHandlerReg = null
-
-        updateDeviceState(false, Collections.emptySet())
-    }
-
-    def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
-        remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-        deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
-        deviceContextProvider.createContextFromCapabilities(capabilities);
-        if (mountInstance != null && schemaContext.isPresent) {
-            mountInstance.schemaContext = schemaContext.get();
-        }
-
-        updateDeviceState(true, capabilities)
-
-        if (mountInstance != null) {
-            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
-            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
-            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
-
-            val rpcs = new ArrayList<RpcRegistration>();
-            if (mountInstance != null && schemaContext.isPresent) {
-                for (rpc : mountInstance.schemaContext.operations) {
-                    rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
-                }
-            }
-            rpcReg = rpcs
-        }
-    }
-
-    private def updateDeviceState(boolean up, Set<QName> capabilities) {
-        val transaction = dataBroker.beginTransaction
-
-        val it = ImmutableCompositeNode.builder
-        setQName(INVENTORY_NODE)
-        addLeaf(INVENTORY_ID, name)
-        addLeaf(INVENTORY_CONNECTED, up)
-
-        logger.debug("Client capabilities {}", capabilities)
-        for (capability : capabilities) {
-            addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
-        }
-
-        logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
-        transaction.removeOperationalData(path)
-        transaction.putOperationalData(path, it.toInstance)
-        logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
-
-        // FIXME: this has to be asynchronous
-        val transactionStatus = transaction.commit.get;
-
-        if (transactionStatus.successful) {
-            logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
-        } else {
-            logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
-            logger.debug("Update device state transaction status " + transaction.status)
-        }
-    }
-
-    override readConfigurationData(InstanceIdentifier path) {
-        val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
-            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
-        val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return data?.findNode(path) as CompositeNode;
-    }
-
-    override readOperationalData(InstanceIdentifier path) {
-        val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
-        val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return data?.findNode(path) as CompositeNode;
-    }
-
-    override getSupportedRpcs() {
-        Collections.emptySet;
-    }
-
-    override invokeRpc(QName rpc, CompositeNode input) {
-        return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
-    }
-
-    override getProviderFunctionality() {
-        Collections.emptySet
-    }
-
-    override onSessionInitiated(ProviderSession session) {
-        dataBroker = session.getService(DataBrokerService);
-
-        val transaction = dataBroker.beginTransaction
-        if (transaction.operationalNodeNotExisting) {
-            transaction.putOperationalData(path, nodeWithId)
-        }
-        if (transaction.configurationNodeNotExisting) {
-            transaction.putConfigurationData(path, nodeWithId)
-        }
-        transaction.commit().get();
-        mountService = session.getService(MountProvisionService);
-        mountInstance = mountService?.createOrGetMountPoint(path);
-    }
-
-    def getNodeWithId() {
-        val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
-        return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
-    }
-
-    def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
-        return null === transaction.readConfigurationData(path);
-    }
-
-    def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
-        return null === transaction.readOperationalData(path);
-    }
-
-    static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
-
-        var Node<?> current = node;
-        for (arg : identifier.path) {
-            if (current instanceof SimpleNode<?>) {
-                return null;
-            } else if (current instanceof CompositeNode) {
-                val currentComposite = (current as CompositeNode);
-
-                current = currentComposite.getFirstCompositeByName(arg.nodeType);
-                if(current == null) {
-                    current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
-                }
-                if(current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.nodeType);
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
-                } if (current == null) {
-                    return null;
-                }
-            }
-        }
-        return current;
-    }
-
-    override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
-        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
-        twoPhaseCommit.prepare()
-        return twoPhaseCommit;
-    }
-
-    def getCapabilities(Collection<String> capabilities) {
-        return FluentIterable.from(capabilities).filter[
-                contains("?") && contains("module=") && contains("revision=")].transform [
-                val parts = split("\\?");
-                val namespace = parts.get(0);
-                val queryParams = FluentIterable.from(parts.get(1).split("&"));
-                var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
-                val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
-                if (revision === null) {
-                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
-                    revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
-                    if (revision != null) {
-                        logger.warn("Netconf device returned revision incorectly escaped for {}", it)
-                    }
-                }
-                if (revision == null) {
-                    return QName.create(URI.create(namespace), null, moduleName);
-                }
-                return QName.create(namespace, revision, moduleName);
-            ].toSet();
-    }
-
-    override close() {
-        bringDown()
-    }
-}
-
-package class NetconfDeviceSchemaContextProvider {
-
-    @Property
-    val NetconfDevice device;
-
-    @Property
-    val SchemaSourceProvider<InputStream> sourceProvider;
-
-    @Property
-    var Optional<SchemaContext> currentContext;
-
-    new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
-        _device = device
-        _sourceProvider = sourceProvider
-        _currentContext = Optional.absent();
-    }
-
-    def createContextFromCapabilities(Iterable<QName> capabilities) {
-        val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
-        if (!sourceContext.missingSources.empty) {
-            device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
-        }
-        device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
-        val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
-        if (!sourceContext.validSources.empty) {
-            val schemaContext = tryToCreateContext(modelsToParse);
-            currentContext = Optional.fromNullable(schemaContext);
-        } else {
-            currentContext = Optional.absent();
-        }
-        if (currentContext.present) {
-            device.logger.debug("Schema context successfully created.");
-        }
-
-    }
-
-    def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
-        val parser = new YangParserImpl();
-        try {
-
-            val models = parser.parseYangModelsFromStreams(modelsToParse);
-            val result = parser.resolveSchemaContext(models);
-            return result;
-        } catch (Exception e) {
-            device.logger.debug("Error occured during parsing YANG schemas", e);
-            return null;
-        }
-    }
-}
index 8c65aa309f125bd1d17bb7efde317f54a0b6cc60..d6bf868b75dfaa55059e19f05bdb5bf924f08306 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.collect.Sets;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 
@@ -39,6 +40,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 class NetconfDeviceListener implements NetconfClientSessionListener {
+
     private static final class Request {
         final UncancellableFuture<RpcResult<CompositeNode>> future;
         final NetconfMessage request;
@@ -64,6 +66,7 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
                 device.getName(), device.getSocketAddress(), session.getSessionId());
 
         this.session = session;
+
         final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
         LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
 
@@ -71,18 +74,24 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
         final SchemaSourceProvider<String> delegate;
         if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
             delegate = new NetconfRemoteSchemaSourceProvider(device);
-            // FIXME parsed caps contain only module-based capabilities
+            // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
         } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
             delegate = new NetconfRemoteSchemaSourceProvider(device);
         } else {
             LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
-            delegate = SchemaSourceProviders.<String>noopProvider();
+            delegate = SchemaSourceProviders.noopProvider();
         }
 
-        device.bringUp(delegate, caps);
+        device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
 
     }
 
+    private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+        // TODO rollback capability cannot be searched for in Set<QName> caps
+        // since this set does not contain module-less capabilities
+        return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
+    }
+
     private synchronized void tearDown(final Exception e) {
         session = null;
 
@@ -137,9 +146,22 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
             requests.poll();
             LOG.debug("Matched {} to {}", r.request, message);
 
-            // FIXME: this can throw exceptions, which should result
-            // in the future failing
-            NetconfMapping.checkValidReply(r.request, message);
+            try {
+                NetconfMapping.checkValidReply(r.request, message);
+            } catch (IllegalStateException e) {
+                LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+                r.future.setException(e);
+                return;
+            }
+
+            try {
+                NetconfMapping.checkSuccessReply(message);
+            } catch (IllegalStateException e) {
+                LOG.warn("Error reply from remote device", e);
+                r.future.setException(e);
+                return;
+            }
+
             r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
                     Collections.<RpcError>emptyList()));
         } else {
index 5f14c264edb4cac6ab3dc1b81ccd8f2b1744a598..8a74b17ac46449c806f745c48e0892a59ec9374b 100644 (file)
@@ -11,9 +11,11 @@ import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NET
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_COMMIT_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_CONFIG_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_EDIT_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_ERROR_OPTION_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_OPERATION_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_RUNNING_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_TARGET_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.ROLLBACK_ON_ERROR_OPTION;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +35,7 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifie
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,13 +50,15 @@ class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<In
     private final DataModification<InstanceIdentifier, CompositeNode> modification;
     private final NetconfDevice device;
     private final boolean candidateSupported;
+    private final boolean rollbackSupported;
 
     public NetconfDeviceTwoPhaseCommitTransaction(NetconfDevice device,
             DataModification<InstanceIdentifier, CompositeNode> modification,
-            boolean candidateSupported) {
+            boolean candidateSupported, boolean rollbackOnErrorSupported) {
         this.device = Preconditions.checkNotNull(device);
         this.modification = Preconditions.checkNotNull(modification);
         this.candidateSupported = candidateSupported;
+        this.rollbackSupported = rollbackOnErrorSupported;
     }
 
     void prepare() throws InterruptedException, ExecutionException {
@@ -91,7 +96,14 @@ class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<In
         } else {
             targetNode = ImmutableCompositeNode.create(NETCONF_RUNNING_QNAME, ImmutableList.<Node<?>>of());
         }
+
         Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
+
+        if(rollbackSupported) {
+            LOG.debug("Rollback-on-error supported, setting {} to {}", NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
+            ret.addLeaf(NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
+        }
+
         ret.add(targetWrapperNode);
         return ret;
     }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.java
new file mode 100644 (file)
index 0000000..3e72914
--- /dev/null
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.activation.UnsupportedDataTypeException;
+import javax.annotation.Nullable;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class NetconfMapping {
+
+    public static URI NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0");
+    public static String NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring";
+    public static URI NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0");
+    public static URI NETCONF_ROLLBACK_ON_ERROR_URI = URI.create("urn:ietf:params:netconf:capability:rollback-on-error:1.0");
+
+    public static QName NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
+    public static QName NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
+    public static QName NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
+    public static QName NETCONF_FILTER_QNAME = QName.create(NETCONF_QNAME, "filter");
+    public static QName NETCONF_TYPE_QNAME = QName.create(NETCONF_QNAME, "type");
+    public static QName NETCONF_GET_CONFIG_QNAME = QName.create(NETCONF_QNAME, "get-config");
+    public static QName NETCONF_EDIT_CONFIG_QNAME = QName.create(NETCONF_QNAME, "edit-config");
+    public static QName NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
+    public static QName NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
+    public static QName NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
+
+    public static QName NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
+    public static QName NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
+    public static QName NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
+
+    public static QName NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
+    public static QName NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
+
+    public static QName NETCONF_ERROR_OPTION_QNAME = QName.create(NETCONF_QNAME, "error-option");
+    public static String ROLLBACK_ON_ERROR_OPTION = "rollback-on-error";
+
+    public static QName NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
+    public static QName NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
+    public static QName NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
+    public static QName NETCONF_CREATE_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI, null,
+            "create-subscription");
+    public static QName NETCONF_CANCEL_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI, null,
+            "cancel-subscription");
+    public static QName IETF_NETCONF_MONITORING_MODULE = QName.create(NETCONF_MONITORING_URI, "2010-10-04",
+            "ietf-netconf-monitoring");
+
+    static List<Node<?>> RUNNING = Collections.<Node<?>> singletonList(new SimpleNodeTOImpl(NETCONF_RUNNING_QNAME,
+            null, null));
+
+    public static CompositeNode CONFIG_SOURCE_RUNNING = new CompositeNodeTOImpl(NETCONF_SOURCE_QNAME, null, RUNNING);
+
+    static AtomicInteger messageId = new AtomicInteger(0);
+
+    static Node<?> toFilterStructure(InstanceIdentifier identifier) {
+        Node<?> previous = null;
+        if (identifier.getPath().isEmpty()) {
+            return null;
+        }
+
+        for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument component : Lists
+                .reverse(identifier.getPath())) {
+            previous = toNode(component, previous);
+        }
+        return filter("subtree", previous);
+    }
+
+    static Node<?> toNode(NodeIdentifierWithPredicates argument, Node<?> node) {
+        List<Node<?>> list = new ArrayList<>();
+        for (Map.Entry<QName, Object> arg : argument.getKeyValues().entrySet()) {
+            list.add(new SimpleNodeTOImpl(arg.getKey(), null, arg.getValue()));
+        }
+        if (node != null) {
+            list.add(node);
+        }
+        return new CompositeNodeTOImpl(argument.getNodeType(), null, list);
+    }
+
+    static Node<?> toNode(PathArgument argument, Node<?> node) {
+        if (node != null) {
+            return new CompositeNodeTOImpl(argument.getNodeType(), null, Collections.<Node<?>> singletonList(node));
+        } else {
+            return new SimpleNodeTOImpl(argument.getNodeType(), null, null);
+        }
+    }
+
+    static CompositeNode toCompositeNode(NetconfMessage message, Optional<SchemaContext> ctx) {
+        // TODO: implement general normalization to normalize incoming Netconf
+        // Message
+        // for Schema Context counterpart
+        return null;
+    }
+
+    static CompositeNode toNotificationNode(NetconfMessage message, Optional<SchemaContext> ctx) {
+        if (ctx.isPresent()) {
+            SchemaContext schemaContext = ctx.get();
+            Set<NotificationDefinition> notifications = schemaContext.getNotifications();
+            Document document = message.getDocument();
+            return XmlDocumentUtils.notificationToDomNodes(document, Optional.fromNullable(notifications));
+        }
+        return null;
+    }
+
+    static NetconfMessage toRpcMessage(QName rpc, CompositeNode node, Optional<SchemaContext> ctx) {
+        CompositeNodeTOImpl rpcPayload = wrap(NETCONF_RPC_QNAME, flattenInput(node));
+        Document w3cPayload = null;
+        try {
+            w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, XmlDocumentUtils.defaultValueCodecProvider());
+        } catch (UnsupportedDataTypeException e) {
+            throw new IllegalArgumentException("Unable to create message", e);
+        }
+        w3cPayload.getDocumentElement().setAttribute("message-id", "m-" + messageId.getAndIncrement());
+        return new NetconfMessage(w3cPayload);
+    }
+
+    static CompositeNode flattenInput(final CompositeNode node) {
+        final QName inputQName = QName.create(node.getNodeType(), "input");
+        CompositeNode input = node.getFirstCompositeByName(inputQName);
+        if (input == null)
+            return node;
+        if (input instanceof CompositeNode) {
+
+            List<Node<?>> nodes = ImmutableList.<Node<?>> builder() //
+                    .addAll(input.getChildren()) //
+                    .addAll(Collections2.filter(node.getChildren(), new Predicate<Node<?>>() {
+                        @Override
+                        public boolean apply(@Nullable final Node<?> input) {
+                            return input.getNodeType() != inputQName;
+                        }
+                    })) //
+                    .build();
+
+            return ImmutableCompositeNode.create(node.getNodeType(), nodes);
+        }
+
+        return input;
+    }
+
+    static RpcResult<CompositeNode> toRpcResult(NetconfMessage message, final QName rpc, Optional<SchemaContext> context) {
+        CompositeNode rawRpc;
+        if (context.isPresent())
+            if (isDataRetrieQNameReply(rpc)) {
+
+                Element xmlData = getDataSubtree(message.getDocument());
+
+                List<org.opendaylight.yangtools.yang.data.api.Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
+                        Optional.of(context.get().getDataDefinitions()));
+
+                CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
+                it.setQName(NETCONF_RPC_REPLY_QNAME);
+                it.add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes));
+
+                rawRpc = it.toInstance();
+                // sys(xmlData)
+            } else {
+                RpcDefinition rpcSchema = Iterables.find(context.get().getOperations(), new Predicate<RpcDefinition>() {
+                    @Override
+                    public boolean apply(final RpcDefinition input) {
+                        return rpc == input.getQName();
+                    }
+                });
+                rawRpc = (CompositeNode) toCompositeNode(message.getDocument());
+            }
+        else {
+            rawRpc = (CompositeNode) toCompositeNode(message.getDocument());
+        }
+        // rawRpc.
+        return Rpcs.getRpcResult(true, rawRpc, Collections.<RpcError> emptySet());
+    }
+
+    static Element getDataSubtree(Document doc) {
+        return (Element) doc.getElementsByTagNameNS(NETCONF_URI.toString(), "data").item(0);
+    }
+
+    static boolean isDataRetrieQNameReply(QName it) {
+        return NETCONF_URI == it.getNamespace()
+                && (it.getLocalName() == NETCONF_GET_CONFIG_QNAME.getLocalName() || it.getLocalName() == NETCONF_GET_QNAME
+                        .getLocalName());
+    }
+
+    static CompositeNodeTOImpl wrap(QName name, Node<?> node) {
+        if (node != null) {
+            return new CompositeNodeTOImpl(name, null, Collections.<Node<?>> singletonList(node));
+        } else {
+            return new CompositeNodeTOImpl(name, null, Collections.<Node<?>> emptyList());
+        }
+    }
+
+    static CompositeNodeTOImpl wrap(QName name, Node<?> additional, Node<?> node) {
+        if (node != null) {
+            return new CompositeNodeTOImpl(name, null, ImmutableList.of(additional, node));
+        } else {
+            return new CompositeNodeTOImpl(name, null, ImmutableList.<Node<?>> of(additional));
+        }
+    }
+
+    static ImmutableCompositeNode filter(String type, Node<?> node) {
+        CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder(); //
+        it.setQName(NETCONF_FILTER_QNAME);
+        it.setAttribute(NETCONF_TYPE_QNAME, type);
+        if (node != null) {
+            return it.add(node).toInstance();
+        } else {
+            return it.toInstance();
+        }
+    }
+
+    public static Node<?> toCompositeNode(Document document) {
+        return XmlDocumentUtils.toDomNode(document);
+    }
+
+    public static void checkValidReply(NetconfMessage input, NetconfMessage output) {
+        String inputMsgId = input.getDocument().getDocumentElement().getAttribute("message-id");
+        String outputMsgId = output.getDocument().getDocumentElement().getAttribute("message-id");
+
+        if(inputMsgId.equals(outputMsgId) == false) {
+            String requestXml = XmlUtil.toString(input.getDocument());
+            String responseXml = XmlUtil.toString(output.getDocument());
+            throw new IllegalStateException(String.format("Rpc request and reply message IDs must be same. Request: %s, response: %s", requestXml, responseXml));
+        }
+    }
+
+    public static void checkSuccessReply(NetconfMessage output) {
+        if(NetconfMessageUtil.isErrorMessage(output)) {
+            throw new IllegalStateException(String.format("Response contains error: %s", XmlUtil.toString(output.getDocument())));
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend
deleted file mode 100644 (file)
index 228a01e..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf
-
-import com.google.common.base.Optional
-import com.google.common.base.Preconditions
-import com.google.common.collect.ImmutableList
-import java.net.URI
-import java.util.ArrayList
-import java.util.Collections
-import java.util.List
-import java.util.Set
-import java.util.concurrent.atomic.AtomicInteger
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.controller.sal.common.util.Rpcs
-import org.opendaylight.yangtools.yang.common.QName
-import org.opendaylight.yangtools.yang.common.RpcResult
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument
-import org.opendaylight.yangtools.yang.data.api.Node
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition
-import org.opendaylight.yangtools.yang.model.api.SchemaContext
-import org.w3c.dom.Document
-import org.w3c.dom.Element
-
-class NetconfMapping {
-
-    public static val NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0")
-    public static val NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
-    public static val NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0")
-
-
-    public static val NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
-    public static val NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
-    public static val NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
-    public static val NETCONF_FILTER_QNAME = QName.create(NETCONF_QNAME, "filter");
-    public static val NETCONF_TYPE_QNAME = QName.create(NETCONF_QNAME, "type");
-    public static val NETCONF_GET_CONFIG_QNAME = QName.create(NETCONF_QNAME, "get-config");
-    public static val NETCONF_EDIT_CONFIG_QNAME = QName.create(NETCONF_QNAME, "edit-config");
-    public static val NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
-    public static val NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
-    public static val NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
-
-    public static val NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
-    public static val NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
-    public static val NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
-
-    public static val NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
-    public static val NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
-
-
-    public static val NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
-    public static val NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
-    public static val NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
-    public static val NETCONF_CREATE_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI,null,"create-subscription");
-    public static val NETCONF_CANCEL_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI,null,"cancel-subscription");
-    public static val IETF_NETCONF_MONITORING_MODULE = QName.create(NETCONF_MONITORING_URI, "2010-10-04","ietf-netconf-monitoring");
-
-    static List<Node<?>> RUNNING = Collections.<Node<?>>singletonList(
-        new SimpleNodeTOImpl(NETCONF_RUNNING_QNAME, null, null));
-    public static val CONFIG_SOURCE_RUNNING = new CompositeNodeTOImpl(NETCONF_SOURCE_QNAME, null, RUNNING);
-
-    static val messageId = new AtomicInteger(0);
-
-    static def Node<?> toFilterStructure(InstanceIdentifier identifier) {
-        var Node<?> previous = null;
-        if(identifier.path.empty) {
-            return null;
-        }
-
-        for (component : identifier.path.reverseView) {
-            val Node<?> current = component.toNode(previous);
-            previous = current;
-        }
-        return filter("subtree",previous);
-    }
-
-    static def dispatch Node<?> toNode(NodeIdentifierWithPredicates argument, Node<?> node) {
-        val list = new ArrayList<Node<?>>();
-        for (arg : argument.keyValues.entrySet) {
-            list.add = new SimpleNodeTOImpl(arg.key, null, arg.value);
-        }
-        if (node != null) {
-            list.add(node);
-        }
-        return new CompositeNodeTOImpl(argument.nodeType, null, list)
-    }
-
-    static def dispatch Node<?> toNode(PathArgument argument, Node<?> node) {
-        if (node != null) {
-            return new CompositeNodeTOImpl(argument.nodeType, null, Collections.singletonList(node));
-        } else {
-            return new SimpleNodeTOImpl(argument.nodeType, null, null);
-        }
-    }
-
-    static def CompositeNode toCompositeNode(NetconfMessage message,Optional<SchemaContext> ctx) {
-        //TODO: implement general normalization to normalize incoming Netconf Message
-        // for Schema Context counterpart
-        return null
-    }
-
-    static def CompositeNode toNotificationNode(NetconfMessage message,Optional<SchemaContext> ctx) {
-        if (ctx.present) {
-            val schemaContext = ctx.get
-            val notifications = schemaContext.notifications
-            val document = message.document
-            return XmlDocumentUtils.notificationToDomNodes(document, Optional.<Set<NotificationDefinition>>fromNullable(notifications))
-        }
-        return null
-    }
-
-    static def NetconfMessage toRpcMessage(QName rpc, CompositeNode node,Optional<SchemaContext> ctx) {
-        val rpcPayload = wrap(NETCONF_RPC_QNAME, flattenInput(node))
-        val w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, XmlDocumentUtils.defaultValueCodecProvider)
-        w3cPayload.documentElement.setAttribute("message-id", "m-" + messageId.andIncrement)
-        return new NetconfMessage(w3cPayload);
-    }
-
-    def static flattenInput(CompositeNode node) {
-        val inputQName = QName.create(node.nodeType,"input");
-        val input = node.getFirstCompositeByName(inputQName);
-        if(input == null) return node;
-        if(input instanceof CompositeNode) {
-
-            val nodes = ImmutableList.builder() //
-                .addAll(input.children) //
-                .addAll(node.children.filter[nodeType != inputQName]) //
-                .build()
-            return ImmutableCompositeNode.create(node.nodeType,nodes);
-        }
-
-    }
-
-    static def RpcResult<CompositeNode> toRpcResult(NetconfMessage message,QName rpc,Optional<SchemaContext> context) {
-        var CompositeNode rawRpc;
-        if(context.present) {
-            if(isDataRetrievalReply(rpc)) {
-
-                val xmlData = message.document.dataSubtree
-                val dataNodes = XmlDocumentUtils.toDomNodes(xmlData, Optional.of(context.get.dataDefinitions))
-
-                val it = ImmutableCompositeNode.builder()
-                setQName(NETCONF_RPC_REPLY_QNAME)
-                add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes));
-
-                rawRpc = it.toInstance;
-                //sys(xmlData)
-            } else {
-                val rpcSchema = context.get.operations.findFirst[QName == rpc]
-                rawRpc = message.document.toCompositeNode() as CompositeNode;
-            }
-        } else {
-            rawRpc = message.document.toCompositeNode() as CompositeNode;
-        }
-        //rawRpc.
-        return Rpcs.getRpcResult(true, rawRpc, Collections.emptySet());
-    }
-
-    def static Element getDataSubtree(Document doc) {
-        doc.getElementsByTagNameNS(NETCONF_URI.toString,"data").item(0) as Element
-    }
-
-    def static boolean isDataRetrievalReply(QName it) {
-        return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName)
-    }
-
-    static def wrap(QName name, Node<?> node) {
-        if (node != null) {
-            return new CompositeNodeTOImpl(name, null, Collections.singletonList(node));
-        } else {
-            return new CompositeNodeTOImpl(name, null, Collections.emptyList());
-        }
-    }
-
-    static def wrap(QName name, Node<?> additional, Node<?> node) {
-        if (node != null) {
-            return new CompositeNodeTOImpl(name, null, ImmutableList.of(additional, node));
-        } else {
-            return new CompositeNodeTOImpl(name, null, ImmutableList.of(additional));
-        }
-    }
-
-    static def filter(String type, Node<?> node) {
-        val it = ImmutableCompositeNode.builder(); //
-        setQName(NETCONF_FILTER_QNAME);
-        setAttribute(NETCONF_TYPE_QNAME,type);
-        if (node != null) {
-            return add(node).toInstance();
-        } else {
-            return toInstance();
-        }
-    }
-
-    public static def Node<?> toCompositeNode(Document document) {
-        return XmlDocumentUtils.toDomNode(document) as Node<?>
-    }
-
-    public static def checkValidReply(NetconfMessage input, NetconfMessage output) {
-        val inputMsgId = input.document.documentElement.getAttribute("message-id")
-        val outputMsgId = output.document.documentElement.getAttribute("message-id")
-        Preconditions.checkState(inputMsgId == outputMsgId,"Rpc request and reply message IDs must be same.");
-
-    }
-
-}
index f49771a953b5ad93d6d60e6cc73f1c3a5473fb6b..065d0ee7ba2a29c55bed1ef157be991e9f2f3a7c 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.sample.l2switch.md.flow;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+
 import org.opendaylight.controller.sample.l2switch.md.topology.NetworkGraphService;
 import org.opendaylight.controller.sample.l2switch.md.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
@@ -25,6 +26,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.InstructionsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
@@ -261,7 +263,7 @@ public class FlowWriterServiceImpl implements FlowWriterService {
         .setBufferId(0L) //
         .setHardTimeout(0) //
         .setIdleTimeout(0) //
-        .setCookie(BigInteger.valueOf(flowCookieInc.getAndIncrement()))
+        .setCookie(new FlowCookie(BigInteger.valueOf(flowCookieInc.getAndIncrement())))
         .setFlags(new FlowModFlags(false, false, false, false, false));
 
     return macToMacFlow.build();