Merge "Do not use protobuf serialization for FindPrimary and it's responses"
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / ReadService.java
index edc91173890970b12680fa27628fe378a9ba1490..356c0e57c887f29f5d448ba523976f8fe077b5da 100644 (file)
@@ -1,6 +1,5 @@
-
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -9,12 +8,15 @@
 
 package org.opendaylight.controller.sal.implementation.internal;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
@@ -25,20 +27,21 @@ import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Node;
-import org.opendaylight.controller.sal.core.NodeTable;
-import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.NodeTable;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.match.MatchType;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.IPluginInReadService;
+import org.opendaylight.controller.sal.reader.IPluginOutReadService;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
 import org.opendaylight.controller.sal.utils.EtherTypes;
-import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.IPProtocols;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.NodeCreator;
@@ -49,19 +52,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The SAL Read Service. It dispatches the read request to
- * the proper SDN protocol plugin
- *
- *
- *
+ * The SAL Read Service. Dispatches read requests to the proper SDN protocol
+ * plugin, and notifies any listeners on updates from any plugin readers
  */
-public class ReadService implements IReadService, CommandProvider {
+public class ReadService implements IReadService, CommandProvider, IPluginOutReadService {
 
-    protected static final Logger logger = LoggerFactory
-            .getLogger(ReadService.class);
-    private ConcurrentHashMap<String, IPluginInReadService>
-        pluginReader =
-        new ConcurrentHashMap<String, IPluginInReadService>();
+    protected static final Logger logger = LoggerFactory.getLogger(ReadService.class);
+    private ConcurrentHashMap<String, ProtocolService<IPluginInReadService>> pluginReader =
+        new ConcurrentHashMap<String, ProtocolService<IPluginInReadService>>();
+    private Set<IReadServiceListener> readerListeners =
+        new CopyOnWriteArraySet<IReadServiceListener>();
 
     /**
      * Function called by the dependency manager when all the required
@@ -81,6 +81,7 @@ public class ReadService implements IReadService, CommandProvider {
         // In case of plugin disactivating make sure we clear the
         // dependencies
         this.pluginReader.clear();
+        this.readerListeners.clear();
     }
 
     /**
@@ -103,129 +104,99 @@ public class ReadService implements IReadService, CommandProvider {
     }
 
     // Set the reference to the plugin flow Reader service
-    public void setService(Map props, IPluginInReadService s) {
-        if (this.pluginReader == null) {
-            logger.error("pluginReader store null");
-            return;
-        }
-
-        logger.trace("Got a service set request {}", s);
-        String type = null;
-        for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
-            logger.trace("Prop key:({}) value:({})", entry.getKey(),
-                    entry.getValue());
-        }
-
-        Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
-        if (value instanceof String) {
-            type = (String) value;
-        }
-        if (type == null) {
-            logger.error("Received a pluginReader without any "
-                    + "protocolPluginType provided");
-        } else {
-            this.pluginReader.put(type, s);
-            logger.debug("Stored the pluginReader for type: {}", type);
-        }
+    public void setService(Map<?, ?> props, IPluginInReadService s) {
+        ProtocolService.set(this.pluginReader, props, s, logger);
     }
 
-    public void unsetService(Map props, IPluginInReadService s) {
-        if (this.pluginReader == null) {
-            logger.error("pluginReader store null");
-            return;
-        }
+    public void unsetService(Map<?, ?> props, IPluginInReadService s) {
+        ProtocolService.unset(this.pluginReader, props, s, logger);
+    }
 
-        String type = null;
-        logger.debug("Received unsetpluginReader request");
-        for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
-            logger.trace("Prop key:({}) value:({})", entry.getKey(),
-                    entry.getValue());
-        }
+    public void setReaderListener(IReadServiceListener service) {
+        logger.trace("Got a listener set request {}", service);
+        this.readerListeners.add(service);
+    }
 
-        Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
-        if (value instanceof String) {
-            type = (String) value;
-        }
-        if (type == null) {
-            logger.error("Received a pluginReader without any "
-                    + "protocolPluginType provided");
-        } else if (this.pluginReader.get(type).equals(s)) {
-            this.pluginReader.remove(type);
-            logger.debug("Removed the pluginReader for type: {}", type);
-        }
+    public void unsetReaderListener(IReadServiceListener service) {
+        logger.trace("Got a listener Unset request");
+        this.readerListeners.remove(service);
     }
 
     @Override
     public FlowOnNode readFlow(Node node, Flow flow) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readFlow(node, flow, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readFlow(node, flow, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
     @Override
     public FlowOnNode nonCachedReadFlow(Node node, Flow flow) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readFlow(node, flow, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readFlow(node, flow, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
     @Override
     public List<FlowOnNode> readAllFlows(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readAllFlow(node, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readAllFlow(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
-        return null;
+        logger.warn("Plugin {} unavailable", node.getType());
+        return Collections.emptyList();
     }
 
     @Override
     public List<FlowOnNode> nonCachedReadAllFlows(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readAllFlow(node, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readAllFlow(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
-        return null;
+        logger.warn("Plugin {} unavailable", node.getType());
+        return Collections.emptyList();
     }
 
     @Override
     public NodeDescription readDescription(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readDescription(node, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readDescription(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
     @Override
     public NodeDescription nonCachedReadDescription(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readDescription(node, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readDescription(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -233,12 +204,13 @@ public class ReadService implements IReadService, CommandProvider {
     public NodeConnectorStatistics readNodeConnector(NodeConnector connector) {
         Node node = connector.getNode();
         if (pluginReader != null && node != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readNodeConnector(connector, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readNodeConnector(connector, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -247,37 +219,40 @@ public class ReadService implements IReadService, CommandProvider {
             NodeConnector connector) {
         Node node = connector.getNode();
         if (pluginReader != null && node != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readNodeConnector(connector, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readNodeConnector(connector, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
     @Override
     public List<NodeConnectorStatistics> readNodeConnectors(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readAllNodeConnector(node, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readAllNodeConnector(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
-        return null;
+        logger.warn("Plugin {} unavailable", node.getType());
+        return Collections.emptyList();
     }
 
     @Override
     public List<NodeTableStatistics> readNodeTable(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readAllNodeTable(node, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readAllNodeTable(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
-        return null;
+        logger.warn("Plugin {} unavailable", node.getType());
+        return Collections.emptyList();
     }
 
 
@@ -285,12 +260,13 @@ public class ReadService implements IReadService, CommandProvider {
     public NodeTableStatistics nonCachedReadNodeTable(NodeTable table) {
         Node node = table.getNode();
         if (pluginReader != null && node != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readNodeTable(table, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readNodeTable(table, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -298,40 +274,71 @@ public class ReadService implements IReadService, CommandProvider {
     public NodeTableStatistics readNodeTable(NodeTable table) {
         Node node = table.getNode();
         if (pluginReader != null && node != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readNodeTable(table, true);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readNodeTable(table, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
     @Override
     public List<NodeConnectorStatistics> nonCachedReadNodeConnectors(Node node) {
         if (pluginReader != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .readAllNodeConnector(node, false);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().readAllNodeConnector(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
-        return null;
+        logger.warn("Plugin {} unavailable", node.getType());
+        return Collections.emptyList();
     }
 
     @Override
     public long getTransmitRate(NodeConnector connector) {
         Node node = connector.getNode();
         if (pluginReader != null && node != null) {
-            if (this.pluginReader.get(node.getType()) != null) {
-                return this.pluginReader.get(node.getType())
-                        .getTransmitRate(connector);
+            ProtocolService<IPluginInReadService> service =
+                this.pluginReader.get(node.getType());
+            if (service != null) {
+                return service.getService().getTransmitRate(connector);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return 0;
     }
 
+    @Override
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeFlowStatisticsUpdated(node, flowStatsList);
+        }
+    }
+
+    @Override
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeConnectorStatisticsUpdated(node, ncStatsList);
+        }
+    }
+
+    @Override
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeTableStatisticsUpdated(node, tableStatsList);
+        }
+    }
+
+    @Override
+    public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        for (IReadServiceListener l : readerListeners){
+            l.descriptionStatisticsUpdated(node, nodeDescription);
+        }
+    }
+
     // ---------------- OSGI TEST CODE ------------------------------//
 
     private void registerWithOSGIConsole() {
@@ -345,16 +352,15 @@ public class ReadService implements IReadService, CommandProvider {
     public String getHelp() {
         StringBuffer help = new StringBuffer();
         help.append("---SAL Reader testing commands---\n");
-        help
-        .append("\t readflows <sid> <cached>  - Read all the (cached) flows from the openflow switch <sid>\n");
-        help
-        .append("\t readflow  <sid> <cached>  - Read the (cached) sample flow from the openflow switch <sid>\n");
-        help
-        .append("\t readdesc  <sid> <cached>  - Read the (cached) description from openflow switch <sid>\n");
-        help
-        .append("\t           cached=true/false. If false or not specified, the protocol plugin cached info\n");
-        help
-        .append("\t           is returned. If true, the info is directly retrieved from the switch\n");
+        help.append("\t readflows <sid> <cached> - Read all the (cached) flows from the openflow switch <sid>\n");
+        help.append("\t readflow  <sid> <cached> - Read the (cached) sample flow from the openflow switch <sid>\n");
+        help.append("\t readdescr <sid> <cached> - Read the (cached) description from openflow switch <sid>\n");
+        help.append("\t\t cached = (true|false). If false or not specified, the plugin cached info\n");
+        help.append("\t\t is returned. If true, the info is directly retrieved from the switch\n");
+        help.append("\t readport <sid> <port>    - Read port statistics for the specified port\n");
+        help.append("\t readports <sid>          - Read port statistics for all ports of specified switch\n");
+        help.append("\t readtable <sid> <tableid>- Read specified table statistics\n");
+
         return help.toString();
     }
 
@@ -567,5 +573,4 @@ public class ReadService implements IReadService, CommandProvider {
         actions.add(new Controller());
         return new Flow(match, actions);
     }
-
 }