2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
20 import org.opendaylight.controller.protocol_plugin.openflow.IDataPacketListen;
21 import org.opendaylight.controller.protocol_plugin.openflow.IDataPacketMux;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
27 import org.opendaylight.controller.sal.core.ConstructionException;
28 import org.opendaylight.controller.sal.core.ContainerFlow;
29 import org.opendaylight.controller.sal.core.IContainerAware;
30 import org.opendaylight.controller.sal.core.IContainerListener;
31 import org.opendaylight.controller.sal.core.Node;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.core.Property;
34 import org.opendaylight.controller.sal.core.UpdateType;
35 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
36 import org.opendaylight.controller.sal.packet.PacketResult;
37 import org.opendaylight.controller.sal.packet.RawPacket;
38 import org.opendaylight.controller.sal.utils.GlobalConstants;
39 import org.opendaylight.controller.sal.utils.HexEncode;
40 import org.openflow.protocol.OFMessage;
41 import org.openflow.protocol.OFPacketIn;
42 import org.openflow.protocol.OFPacketOut;
43 import org.openflow.protocol.OFPort;
44 import org.openflow.protocol.OFType;
45 import org.openflow.protocol.action.OFAction;
46 import org.openflow.protocol.action.OFActionOutput;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class DataPacketMuxDemux implements IContainerListener,
51 IMessageListener, IDataPacketMux, IInventoryShimExternalListener, IContainerAware {
52 protected static final Logger logger = LoggerFactory
53 .getLogger(DataPacketMuxDemux.class);
54 private IController controller = null;
55 private ConcurrentMap<Long, ISwitch> swID2ISwitch = new ConcurrentHashMap<Long, ISwitch>();
56 // Gives a map between a Container and all the DataPacket listeners on SAL
57 private ConcurrentMap<String, IPluginOutDataPacketService> pluginOutDataPacketServices = new ConcurrentHashMap<String, IPluginOutDataPacketService>();
58 // Gives a map between a NodeConnector and the containers to which it
60 private ConcurrentMap<NodeConnector, List<String>> nc2Container = new ConcurrentHashMap<NodeConnector, List<String>>();
61 // Gives a map between a Container and the FlowSpecs on it
62 private ConcurrentMap<String, List<ContainerFlow>> container2FlowSpecs = new ConcurrentHashMap<String, List<ContainerFlow>>();
63 // Track local data packet listener
64 private List<IDataPacketListen> iDataPacketListen = new CopyOnWriteArrayList<IDataPacketListen>();
65 private IPluginOutConnectionService connectionOutService;
67 void setIDataPacketListen(IDataPacketListen s) {
68 if (this.iDataPacketListen != null) {
69 if (!this.iDataPacketListen.contains(s)) {
70 logger.debug("Added new IDataPacketListen");
71 this.iDataPacketListen.add(s);
76 void unsetIDataPacketListen(IDataPacketListen s) {
77 if (this.iDataPacketListen != null) {
78 if (this.iDataPacketListen.contains(s)) {
79 logger.debug("Removed IDataPacketListen");
80 this.iDataPacketListen.remove(s);
85 void setPluginOutDataPacketService(Map<String, Object> props,
86 IPluginOutDataPacketService s) {
88 logger.error("Didn't receive the service properties");
91 String containerName = (String) props.get("containerName");
92 if (containerName == null) {
93 logger.error("containerName not supplied");
96 if (this.pluginOutDataPacketServices != null) {
97 // It's expected only one SAL per container as long as the
98 // replication is done in the SAL implementation toward
100 this.pluginOutDataPacketServices.put(containerName, s);
101 logger.debug("New outService for container: {}", containerName);
105 void unsetPluginOutDataPacketService(Map<String, Object> props,
106 IPluginOutDataPacketService s) {
108 logger.error("Didn't receive the service properties");
111 String containerName = (String) props.get("containerName");
112 if (containerName == null) {
113 logger.error("containerName not supplied");
116 if (this.pluginOutDataPacketServices != null) {
117 this.pluginOutDataPacketServices.remove(containerName);
118 logger.debug("Removed outService for container: {}", containerName);
122 void setController(IController s) {
123 logger.debug("Controller provider set in DATAPACKET SERVICES");
127 void unsetController(IController s) {
128 if (this.controller == s) {
129 logger.debug("Controller provider UNset in DATAPACKET SERVICES");
130 this.controller = null;
134 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
135 connectionOutService = s;
138 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
139 if (connectionOutService == s) {
140 connectionOutService = null;
145 * Function called by the dependency manager when all the required
146 * dependencies are satisfied
150 this.controller.addMessageListener(OFType.PACKET_IN, this);
154 * Function called by the dependency manager when at least one dependency
155 * become unsatisfied or when the component is shutting down because for
156 * example bundle is being stopped.
160 this.controller.removeMessageListener(OFType.PACKET_IN, this);
162 // Clear state that may need to be reused on component restart
163 this.pluginOutDataPacketServices.clear();
164 this.nc2Container.clear();
165 this.container2FlowSpecs.clear();
166 this.controller = null;
167 this.swID2ISwitch.clear();
171 public void receive(ISwitch sw, OFMessage msg) {
172 if (sw == null || msg == null
173 || this.pluginOutDataPacketServices == null) {
174 // Something fishy, we cannot do anything
176 "sw: {} and/or msg: {} and/or pluginOutDataPacketServices: {} is null!",
177 new Object[] { sw, msg, this.pluginOutDataPacketServices });
181 Long ofSwitchID = Long.valueOf(sw.getId());
183 Node n = new Node(Node.NodeIDType.OPENFLOW, ofSwitchID);
184 if (!connectionOutService.isLocal(n)) {
185 logger.debug("Connection service refused DataPacketMuxDemux receive {} {}", sw, msg);
189 catch (Exception e) {
193 if (msg instanceof OFPacketIn) {
194 OFPacketIn ofPacket = (OFPacketIn) msg;
195 Short ofPortID = Short.valueOf(ofPacket.getInPort());
198 Node n = new Node(Node.NodeIDType.OPENFLOW, ofSwitchID);
199 NodeConnector p = PortConverter.toNodeConnector(ofPortID, n);
200 RawPacket dataPacket = new RawPacket(ofPacket.getPacketData());
201 dataPacket.setIncomingNodeConnector(p);
203 // Try to dispatch the packet locally, in here we will
204 // pass the parsed packet simply because once the
205 // packet infra is settled all the packets will passed
206 // around as parsed and read-only
207 for (int i = 0; i < this.iDataPacketListen.size(); i++) {
208 IDataPacketListen s = this.iDataPacketListen.get(i);
209 if (s.receiveDataPacket(dataPacket).equals(
210 PacketResult.CONSUME)) {
211 logger.trace("Consumed locally data packet");
216 // Now dispatch the packet toward SAL at least for
217 // default container, we need to revisit this in a general
218 // slicing architecture refresh
219 IPluginOutDataPacketService defaultOutService = this.pluginOutDataPacketServices
220 .get(GlobalConstants.DEFAULT.toString());
221 if (defaultOutService != null) {
222 defaultOutService.receiveDataPacket(dataPacket);
223 if (logger.isTraceEnabled()) {
225 "Dispatched to apps a frame of size: {} on " +
226 "container: {}: {}", new Object[] {
227 ofPacket.getPacketData().length,
228 GlobalConstants.DEFAULT.toString(),
229 HexEncode.bytesToHexString(dataPacket
230 .getPacketData()) });
233 // Now check the mapping between nodeConnector and
234 // Container and later on optimally filter based on
236 List<String> containersRX = this.nc2Container.get(p);
237 if (containersRX != null) {
238 for (int i = 0; i < containersRX.size(); i++) {
239 String container = containersRX.get(i);
240 IPluginOutDataPacketService s = this.pluginOutDataPacketServices
243 // TODO add filtering on a per-flowSpec base
244 s.receiveDataPacket(dataPacket);
245 if (logger.isTraceEnabled()) {
247 "Dispatched to apps a frame of size: {}" +
248 " on container: {}: {}", new Object[] {
249 ofPacket.getPacketData().length,
251 HexEncode.bytesToHexString(dataPacket
252 .getPacketData()) });
258 // This is supposed to be the catch all for all the
259 // DataPacket hence we will assume it has been handled
261 } catch (ConstructionException cex) {
264 // If we reach this point something went wrong.
267 // We don't care about non-data packets
273 public void transmitDataPacket(RawPacket outPkt) {
275 if (outPkt == null) {
276 logger.debug("outPkt is null!");
280 NodeConnector outPort = outPkt.getOutgoingNodeConnector();
281 if (outPort == null) {
282 logger.debug("outPort is null! outPkt: {}", outPkt);
286 if (!connectionOutService.isLocal(outPort.getNode())) {
287 logger.debug("data packets will not be sent to {} in a non-master controller", outPort.toString());
292 if (!outPort.getType().equals(
293 NodeConnector.NodeConnectorIDType.OPENFLOW)) {
294 // The output Port is not of type OpenFlow
295 logger.debug("outPort is not OF Type! outPort: {}", outPort);
299 Short port = (Short) outPort.getID();
300 Long swID = (Long) outPort.getNode().getID();
301 ISwitch sw = this.swID2ISwitch.get(swID);
304 // If we cannot get the controller descriptor we cannot even
305 // send out the frame
306 logger.debug("swID: {} - sw is null!", swID);
310 byte[] data = outPkt.getPacketData();
312 OFActionOutput action = new OFActionOutput().setPort(port);
314 OFPacketOut po = new OFPacketOut()
315 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
316 .setInPort(OFPort.OFPP_NONE)
317 .setActions(Collections.singletonList((OFAction) action))
318 .setActionsLength((short) OFActionOutput.MINIMUM_LENGTH);
320 po.setLengthU(OFPacketOut.MINIMUM_LENGTH + po.getActionsLength()
322 po.setPacketData(data);
324 // send PACKET_OUT at high priority
325 sw.asyncFastSend(po);
326 logger.trace("Transmitted a frame of size: {}", data.length);
329 public void addNode(Node node, Set<Property> props) {
331 logger.debug("node is null!");
335 long sid = (Long) node.getID();
336 ISwitch sw = controller.getSwitches().get(sid);
338 logger.debug("sid: {} - sw is null!", sid);
341 this.swID2ISwitch.put(sw.getId(), sw);
344 public void removeNode(Node node) {
346 logger.debug("node is null!");
350 long sid = (Long) node.getID();
351 ISwitch sw = controller.getSwitches().get(sid);
353 logger.debug("sid: {} - sw is null!", sid);
356 this.swID2ISwitch.remove(sw.getId());
360 public void tagUpdated(String containerName, Node n, short oldTag,
361 short newTag, UpdateType t) {
366 public void containerFlowUpdated(String containerName,
367 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
368 if (this.container2FlowSpecs == null) {
369 logger.error("container2FlowSpecs is NULL");
372 List<ContainerFlow> fSpecs = this.container2FlowSpecs
374 if (fSpecs == null) {
375 fSpecs = new CopyOnWriteArrayList<ContainerFlow>();
379 if (!fSpecs.contains(currentFlow)) {
380 fSpecs.add(currentFlow);
382 container2FlowSpecs.put(containerName, fSpecs);
385 fSpecs.remove(currentFlow);
393 public void nodeConnectorUpdated(String containerName, NodeConnector p,
395 if (this.nc2Container == null) {
396 logger.error("nc2Container is NULL");
399 List<String> containers = this.nc2Container.get(p);
400 if (containers == null) {
401 containers = new CopyOnWriteArrayList<String>();
403 boolean updateMap = false;
406 if (!containers.contains(containerName)) {
407 containers.add(containerName);
412 if (containers.contains(containerName)) {
413 containers.remove(containerName);
421 if (containers.isEmpty()) {
422 // Do cleanup to reduce memory footprint if no
423 // elements to be tracked
424 this.nc2Container.remove(p);
426 this.nc2Container.put(p, containers);
432 public void containerModeUpdated(UpdateType t) {
437 public void updateNode(Node node, UpdateType type, Set<Property> props) {
440 addNode(node, props);
451 public void updateNodeConnector(NodeConnector nodeConnector,
452 UpdateType type, Set<Property> props) {
457 public void containerCreate(String containerName) {
462 public void containerDestroy(String containerName) {
463 Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
464 for (Map.Entry<NodeConnector, List<String>> entry : nc2Container.entrySet()) {
465 List<String> ncContainers = entry.getValue();
466 if (ncContainers.contains(containerName)) {
467 NodeConnector nodeConnector = entry.getKey();
468 removeNodeConnectorSet.add(nodeConnector);
471 for (NodeConnector nodeConnector : removeNodeConnectorSet) {
472 List<String> ncContainers = nc2Container.get(nodeConnector);
473 ncContainers.remove(containerName);
474 if (ncContainers.isEmpty()) {
475 nc2Container.remove(nodeConnector);
478 container2FlowSpecs.remove(containerName);